Springboot 集成Aliyun MQ消息队列,Aliyun 消息队列配置及代码实现

JSON 2018-04-21 01:38:52 15525

今天需要使用  Springboot  来集成  Aliyun   消息队列,看了官方的  Demo  ,都是  Spring  XML 方式配置。对照着做了一遍  Springboot  配置的方式。直接上代码+配置。

Aliyun   队列申请,我就不做叙述了。链接:https://ons.console.aliyun.com/

Springboot yml MQ配置:

aliyun:
  accessKey: LTAIlH****7IMXnuV
  accessSecretKey: T981eNFj****EycS0jq8xdThdfXCo

  queue:
      log:
        topic: ios-****-log-topic_dev
        producerId: PID_ios-****-log_dev
        consumerId: CID_ios-****-log_dev
      stock:
        topic: ios-****-stock-topic_dev
        producerId: PID_ios-****-stock_dev
        consumerId: CID_ios-****-stock_dev

****是我不让你们看见而已,别你也****,另外建议本地测试或者开发采用公网的  MQ  ,线上配置采用你对应的  ECS  或者对应的地区的  MQ  。这样速度会快一点。

Springboot @Bean Java代码

@Value("${aliyun.accessKey}")
private String accessKey;
@Value("${aliyun.accessSecretKey}")
private String secretKey;


@Value("${aliyun.queue.log.producerId}")
private String logPriducerId;
@Value("${aliyun.queue.log.consumerId}")
private String logConsumerId;
@Value("${aliyun.queue.log.topic}")
private String logTopic;

@Value("${aliyun.queue.stock.producerId}")
private String stockPriducerId;
@Value("${aliyun.queue.stock.consumerId}")
private String stockConsumerId;
@Value("${aliyun.queue.stock.topic}")
private String stockTopic;

@Bean(name="logProducerBean")
public ProducerBean initLogProducerBean(){
    ProducerBean producerBean = new ProducerBean();
    Properties properties = new Properties();
    // 您在控制台创建的 Producer ID
    properties.put(PropertyKeyConst.ProducerId, logPriducerId);
    // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.SecretKey, secretKey);

    producerBean.setProperties(properties);

    producerBean.start();
    return producerBean;
}
@Bean(name="stockProducerBean")
public ProducerBean initStockProducerBean(){
    ProducerBean producerBean = new ProducerBean();
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    properties.put(PropertyKeyConst.ProducerId, stockPriducerId);
    properties.put(PropertyKeyConst.SecretKey, secretKey);

    producerBean.setProperties(properties);
    producerBean.start();

    return producerBean;
}
@Bean(name="stockConsumerBean")
public ConsumerBean initStockConsumerBean(){
    ConsumerBean consumerBean = new ConsumerBean();
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.ConsumerId, stockConsumerId);
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    properties.put(PropertyKeyConst.SecretKey, secretKey);
    properties.put(PropertyKeyConst.ConsumeThreadNums, 50);

    consumerBean.setProperties(properties);

    //监听消息
    MessageListener messageListener = (message, context) -> {


        System.out.println("Receive2: " + message.getMsgID());
        try {

            return Action.CommitMessage;
        }catch (Exception e) {

            return Action.ReconsumeLater;
        }
    };
    Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();

    Subscription subscription = new Subscription();
    subscription.setTopic(stockTopic);
    subscription.setExpression("*");//tag 接受所有
    subscriptionTable.put(subscription,messageListener);
    consumerBean.setSubscriptionTable(subscriptionTable);
    consumerBean.start();




    return consumerBean;
}
@Bean(name="logConsumerBean")
public Consumer initLogConsumerBean(){
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.ConsumerId, logConsumerId);
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    properties.put(PropertyKeyConst.SecretKey, secretKey);
    properties.put(PropertyKeyConst.ConsumeThreadNums, 40);


    Consumer consumer = ONSFactory.createConsumer(properties);

    consumer.start();
    //订阅消息
    //订阅多个 Tag  TagA||TagB,如果模糊订阅 *
    consumer.subscribe(logTopic, "TagA||TagB", (message, context) -> {
        System.out.println("Receive: " + message);
        try {
            //处理你的业务
            return Action.CommitMessage;
        }catch (Exception e) {

            return Action.ReconsumeLater;
        }
    });


    return consumer;
}

阿里云 队列重试次数说明:

这种普通队列,保持着至少一次的规则,就是当你返回 ReconsumeLater 后,它会按你的 Delay 次数来计算具体的时间再次请求。默认是16次,对应的时间表如下:

第几次重试 每次重试间隔时间 第几次重试 每次重试间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

自定义队列重试次数:

Properties properties = new Properties();
//配置对应 Consumer ID 的最大消息重试次数为20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);

队列重试规则的定义:

public class MessageListenerImpl implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext context) {
        //方法3:消息处理逻辑抛出异常,消息将重试
        doConsumeMessage(message);
        //方式1:返回 Action.ReconsumeLater,消息将重试
        return Action.ReconsumeLater;
        //方式2:返回 null,消息将重试
        return null;
        //方式3:直接抛出异常, 消息将重试
        throw new RuntimeException("Consumer Message exceotion");
    }
}


版权所属:SO JSON在线解析

原文地址:https://www.sojson.com/blog/293.html

转载时必须以链接形式注明原始出处及本声明。

本文主题:

如果本文对你有帮助,那么请你赞助我,让我更有激情的写下去,帮助更多的人。

关于作者
一个低调而闷骚的男人。
相关文章
Java 集成阿里云消息队列,日志消息存储
Springboot + Freemarker 集成配置
Springboot 集成 Ehcache 代码讲解
Shiro教程(四)Shiro + Redis配置
Springboot + Mybatis +Maven 自动生成 Mapper.xml,Entity,Dao。 generator 配置
Elasticsearch教程,Elasticsearch配置文件 — elasticsearch.yml
Springboot + Mybatis,数据库多数据源配置项目Demo【源码下载】
Elasticsearch教程,Elasticsearch 设近义词搜索,IK分词器实现同义词搜索
Shiro教程(七)Shiro Session共享配置以及实现
Springboot HTTP Get/Post 请求讲解,Springboot几行代码完成Http请求
最新文章
Linux I/O重定向 1767
Ruby 循环 - while、for、until、break、redo 和 retry 711
Node.js:全局对象 517
如何使用终端检查Linux上的内存使用情况 635
JavaScript对象详细剖析 300
Python print() 函数 409
PHP if/else/elseif 语句 407
HTML5 Canvas弧线教程 387
Java赋值运算符 431
XML内部实体和外部实体 464
最热文章
最新MyEclipse8.5注册码,有效期到2020年 (已经更新) 686836
苹果电脑Mac怎么恢复出厂系统?苹果系统怎么重装系统? 675081
免费天气API,天气JSON API,不限次数获取十五天的天气预报 615785
免费天气API,全国天气 JSON API接口,可以获取五天的天气预报 611117
Jackson 时间格式化,时间注解 @JsonFormat 用法、时差问题说明 555623
我为什么要选择RabbitMQ ,RabbitMQ简介,各种MQ选型对比 510028
Elasticsearch教程(四) elasticsearch head 插件安装和使用 481399
Jackson 美化输出JSON,优雅的输出JSON数据,格式化输出JSON数据... ... 269208
Java 信任所有SSL证书,HTTPS请求抛错,忽略证书请求完美解决 244787
Elasticsearch教程(一),全程直播(小白级别) 227489
支付扫码

所有赞助/开支都讲公开明细,用于网站维护:赞助名单查看

查看我的收藏

正在加载... ...