Springboot 集成Aliyun MQ消息队列,Aliyun 消息队列配置及代码实现

JSON 2018-04-21 01:38:52 15521

今天需要使用  Springboot  来集成  Aliyun   消息队列,看了官方的  Demo  ,都是  Spring  XML 方式配置。对照着做了一遍  Springboot  配置的方式。直接上代码+配置。

Aliyun   队列申请,我就不做叙述了。链接:https://ons.console.aliyun.com/

Springboot yml MQ配置:

aliyun:
  accessKey: LTAIlH****7IMXnuV
  accessSecretKey: T981eNFj****EycS0jq8xdThdfXCo

  queue:
      log:
        topic: ios-****-log-topic_dev
        producerId: PID_ios-****-log_dev
        consumerId: CID_ios-****-log_dev
      stock:
        topic: ios-****-stock-topic_dev
        producerId: PID_ios-****-stock_dev
        consumerId: CID_ios-****-stock_dev

****是我不让你们看见而已,别你也****,另外建议本地测试或者开发采用公网的  MQ  ,线上配置采用你对应的  ECS  或者对应的地区的  MQ  。这样速度会快一点。

Springboot @Bean Java代码

@Value("${aliyun.accessKey}")
private String accessKey;
@Value("${aliyun.accessSecretKey}")
private String secretKey;


@Value("${aliyun.queue.log.producerId}")
private String logPriducerId;
@Value("${aliyun.queue.log.consumerId}")
private String logConsumerId;
@Value("${aliyun.queue.log.topic}")
private String logTopic;

@Value("${aliyun.queue.stock.producerId}")
private String stockPriducerId;
@Value("${aliyun.queue.stock.consumerId}")
private String stockConsumerId;
@Value("${aliyun.queue.stock.topic}")
private String stockTopic;

@Bean(name="logProducerBean")
public ProducerBean initLogProducerBean(){
    ProducerBean producerBean = new ProducerBean();
    Properties properties = new Properties();
    // 您在控制台创建的 Producer ID
    properties.put(PropertyKeyConst.ProducerId, logPriducerId);
    // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.SecretKey, secretKey);

    producerBean.setProperties(properties);

    producerBean.start();
    return producerBean;
}
@Bean(name="stockProducerBean")
public ProducerBean initStockProducerBean(){
    ProducerBean producerBean = new ProducerBean();
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    properties.put(PropertyKeyConst.ProducerId, stockPriducerId);
    properties.put(PropertyKeyConst.SecretKey, secretKey);

    producerBean.setProperties(properties);
    producerBean.start();

    return producerBean;
}
@Bean(name="stockConsumerBean")
public ConsumerBean initStockConsumerBean(){
    ConsumerBean consumerBean = new ConsumerBean();
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.ConsumerId, stockConsumerId);
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    properties.put(PropertyKeyConst.SecretKey, secretKey);
    properties.put(PropertyKeyConst.ConsumeThreadNums, 50);

    consumerBean.setProperties(properties);

    //监听消息
    MessageListener messageListener = (message, context) -> {


        System.out.println("Receive2: " + message.getMsgID());
        try {

            return Action.CommitMessage;
        }catch (Exception e) {

            return Action.ReconsumeLater;
        }
    };
    Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();

    Subscription subscription = new Subscription();
    subscription.setTopic(stockTopic);
    subscription.setExpression("*");//tag 接受所有
    subscriptionTable.put(subscription,messageListener);
    consumerBean.setSubscriptionTable(subscriptionTable);
    consumerBean.start();




    return consumerBean;
}
@Bean(name="logConsumerBean")
public Consumer initLogConsumerBean(){
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.ConsumerId, logConsumerId);
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    properties.put(PropertyKeyConst.SecretKey, secretKey);
    properties.put(PropertyKeyConst.ConsumeThreadNums, 40);


    Consumer consumer = ONSFactory.createConsumer(properties);

    consumer.start();
    //订阅消息
    //订阅多个 Tag  TagA||TagB,如果模糊订阅 *
    consumer.subscribe(logTopic, "TagA||TagB", (message, context) -> {
        System.out.println("Receive: " + message);
        try {
            //处理你的业务
            return Action.CommitMessage;
        }catch (Exception e) {

            return Action.ReconsumeLater;
        }
    });


    return consumer;
}

阿里云 队列重试次数说明:

这种普通队列,保持着至少一次的规则,就是当你返回 ReconsumeLater 后,它会按你的 Delay 次数来计算具体的时间再次请求。默认是16次,对应的时间表如下:

第几次重试 每次重试间隔时间 第几次重试 每次重试间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

自定义队列重试次数:

Properties properties = new Properties();
//配置对应 Consumer ID 的最大消息重试次数为20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);

队列重试规则的定义:

public class MessageListenerImpl implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext context) {
        //方法3:消息处理逻辑抛出异常,消息将重试
        doConsumeMessage(message);
        //方式1:返回 Action.ReconsumeLater,消息将重试
        return Action.ReconsumeLater;
        //方式2:返回 null,消息将重试
        return null;
        //方式3:直接抛出异常, 消息将重试
        throw new RuntimeException("Consumer Message exceotion");
    }
}


版权所属:SO JSON在线解析

原文地址:https://www.sojson.com/blog/293.html

转载时必须以链接形式注明原始出处及本声明。

本文主题:

如果本文对你有帮助,那么请你赞助我,让我更有激情的写下去,帮助更多的人。

关于作者
一个低调而闷骚的男人。
相关文章
Java 集成阿里云消息队列,日志消息存储
Springboot + Freemarker 集成配置
Springboot 集成 Ehcache 代码讲解
Shiro教程(四)Shiro + Redis配置
Springboot + Mybatis +Maven 自动生成 Mapper.xml,Entity,Dao。 generator 配置
Springboot + Mybatis,数据库多数据源配置项目Demo【源码下载】
Elasticsearch教程,Elasticsearch配置文件 — elasticsearch.yml
Shiro教程(七)Shiro Session共享配置以及实现
Elasticsearch教程,Elasticsearch 设近义词搜索,IK分词器实现同义词搜索
Springboot HTTP Get/Post 请求讲解,Springboot几行代码完成Http请求
最新文章
PHP回显语句 36
Linux—文件树 83
C语言while循环和do while循环 127
Python元组剖析 151
MySQL触发器教程 262
sql使用布尔运算符和关系运算符 235
C语言的变量和常量 291
PHP变量剖析 193
SQL全外连接剖析 339
SQL自然连接剖析 223
最热文章
最新MyEclipse8.5注册码,有效期到2020年 (已经更新) 681687
苹果电脑Mac怎么恢复出厂系统?苹果系统怎么重装系统? 674703
免费天气API,全国天气 JSON API接口,可以获取五天的天气预报 601610
免费天气API,天气JSON API,不限次数获取十五天的天气预报 575081
Jackson 时间格式化,时间注解 @JsonFormat 用法、时差问题说明 552740
我为什么要选择RabbitMQ ,RabbitMQ简介,各种MQ选型对比 509309
Elasticsearch教程(四) elasticsearch head 插件安装和使用 479847
Jackson 美化输出JSON,优雅的输出JSON数据,格式化输出JSON数据... ... 264119
Java 信任所有SSL证书,HTTPS请求抛错,忽略证书请求完美解决 244246
Elasticsearch教程(一),全程直播(小白级别) 225502
支付扫码

所有赞助/开支都讲公开明细,用于网站维护:赞助名单查看

查看我的收藏

正在加载... ...