Springboot 集成Aliyun MQ消息队列,Aliyun 消息队列配置及代码实现

soゝso 2018-04-21 01:38:52 8457

今天需要使用  Springboot  来集成  Aliyun   消息队列,看了官方的  Demo  ,都是  Spring  XML 方式配置。对照着做了一遍  Springboot  配置的方式。直接上代码+配置。

Aliyun   队列申请,我就不做叙述了。链接:https://ons.console.aliyun.com/

Springboot yml MQ配置:

aliyun:
  accessKey: LTAIlH****7IMXnuV
  accessSecretKey: T981eNFj****EycS0jq8xdThdfXCo

  queue:
      log:
        topic: ios-****-log-topic_dev
        producerId: PID_ios-****-log_dev
        consumerId: CID_ios-****-log_dev
      stock:
        topic: ios-****-stock-topic_dev
        producerId: PID_ios-****-stock_dev
        consumerId: CID_ios-****-stock_dev

****是我不让你们看见而已,别你也****,另外建议本地测试或者开发采用公网的  MQ  ,线上配置采用你对应的  ECS  或者对应的地区的  MQ  。这样速度会快一点。

Springboot @Bean Java代码

@Value("${aliyun.accessKey}")
private String accessKey;
@Value("${aliyun.accessSecretKey}")
private String secretKey;


@Value("${aliyun.queue.log.producerId}")
private String logPriducerId;
@Value("${aliyun.queue.log.consumerId}")
private String logConsumerId;
@Value("${aliyun.queue.log.topic}")
private String logTopic;

@Value("${aliyun.queue.stock.producerId}")
private String stockPriducerId;
@Value("${aliyun.queue.stock.consumerId}")
private String stockConsumerId;
@Value("${aliyun.queue.stock.topic}")
private String stockTopic;

@Bean(name="logProducerBean")
public ProducerBean initLogProducerBean(){
    ProducerBean producerBean = new ProducerBean();
    Properties properties = new Properties();
    // 您在控制台创建的 Producer ID
    properties.put(PropertyKeyConst.ProducerId, logPriducerId);
    // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.SecretKey, secretKey);

    producerBean.setProperties(properties);

    producerBean.start();
    return producerBean;
}
@Bean(name="stockProducerBean")
public ProducerBean initStockProducerBean(){
    ProducerBean producerBean = new ProducerBean();
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    properties.put(PropertyKeyConst.ProducerId, stockPriducerId);
    properties.put(PropertyKeyConst.SecretKey, secretKey);

    producerBean.setProperties(properties);
    producerBean.start();

    return producerBean;
}
@Bean(name="stockConsumerBean")
public ConsumerBean initStockConsumerBean(){
    ConsumerBean consumerBean = new ConsumerBean();
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.ConsumerId, stockConsumerId);
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    properties.put(PropertyKeyConst.SecretKey, secretKey);
    properties.put(PropertyKeyConst.ConsumeThreadNums, 50);

    consumerBean.setProperties(properties);

    //监听消息
    MessageListener messageListener = (message, context) -> {


        System.out.println("Receive2: " + message.getMsgID());
        try {

            return Action.CommitMessage;
        }catch (Exception e) {

            return Action.ReconsumeLater;
        }
    };
    Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();

    Subscription subscription = new Subscription();
    subscription.setTopic(stockTopic);
    subscription.setExpression("*");//tag 接受所有
    subscriptionTable.put(subscription,messageListener);
    consumerBean.setSubscriptionTable(subscriptionTable);
    consumerBean.start();




    return consumerBean;
}
@Bean(name="logConsumerBean")
public Consumer initLogConsumerBean(){
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.ConsumerId, logConsumerId);
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    properties.put(PropertyKeyConst.SecretKey, secretKey);
    properties.put(PropertyKeyConst.ConsumeThreadNums, 40);


    Consumer consumer = ONSFactory.createConsumer(properties);

    consumer.start();
    //订阅消息
    //订阅多个 Tag  TagA||TagB,如果模糊订阅 *
    consumer.subscribe(logTopic, "TagA||TagB", (message, context) -> {
        System.out.println("Receive: " + message);
        try {
            //处理你的业务
            return Action.CommitMessage;
        }catch (Exception e) {

            return Action.ReconsumeLater;
        }
    });


    return consumer;
}

阿里云 队列重试次数说明:

这种普通队列,保持着至少一次的规则,就是当你返回 ReconsumeLater 后,它会按你的 Delay 次数来计算具体的时间再次请求。默认是16次,对应的时间表如下:

第几次重试 每次重试间隔时间 第几次重试 每次重试间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

自定义队列重试次数:

Properties properties = new Properties();
//配置对应 Consumer ID 的最大消息重试次数为20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);

队列重试规则的定义:

public class MessageListenerImpl implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext context) {
        //方法3:消息处理逻辑抛出异常,消息将重试
        doConsumeMessage(message);
        //方式1:返回 Action.ReconsumeLater,消息将重试
        return Action.ReconsumeLater;
        //方式2:返回 null,消息将重试
        return null;
        //方式3:直接抛出异常, 消息将重试
        throw new RuntimeException("Consumer Message exceotion");
    }
}


版权所属:SO JSON在线解析

原文地址:https://www.sojson.com/blog/293.html

转载时必须以链接形式注明原始出处及本声明。

本文主题:

如果本文对你有帮助,那么请你赞助我,让我更有激情的写下去,帮助更多的人。

关于作者
一个低调而闷骚的男人。
相关文章
Java 集成阿里云消息队列,日志消息存储
Springboot + Freemarker 集成配置
Springboot + Mybatis +Maven 自动生成 Mapper.xml,Entity,Dao。 generator 配置
Shiro教程(四)Shiro + Redis配置
Springboot + Mybatis,数据库多数据源配置项目Demo【源码下载】
Shiro教程(七)Shiro Session共享配置以及实现
Elasticsearch教程,Elasticsearch配置文件 — elasticsearch.yml
SpringBoot 集成Spring-data-redis,redis对象序列化存储
Elasticsearch教程,Elasticsearch 设近义词搜索,IK分词器实现同义词搜索
MyEclipse8.5 注册码生成 Java代码实现方式。永久免费
最新文章
QUIC / HTTP3 协议详细分析讲解 966
恭喜那个做云存储的七牛云完成 F 轮 10 亿人民币的融资,开启新的云旅程 1664
Autojs怎么安全加密?Autojs在线加密工具注意事项。 2597
Java JSON 组件选型之 FastJson 为什么总有漏洞? 7482
使用七牛云存储实现图片API,自动删除图片方案合集 2374
神速ICP备案经验分享,ICP备案居然一天就通过了 3745
百度加强推送URL链接,百度SEO强行推送链接JavaScript代码案例讲解。 3976
SOJSON 拓展服务器被DDos攻击了一晚上,是如何解决的? 4654
湖南地区备案“新增网站需提交组网方案或解释说明”,关于备案做简单叙述 4945
企查查你是个什么企业,骗子的帮凶,诈骗的集中营,通过企查查骚扰企业电话不断,为所欲为的企查查 11717
最热文章
苹果电脑Mac怎么恢复出厂系统?苹果系统怎么重装系统? 463164
我为什么要选择RabbitMQ ,RabbitMQ简介,各种MQ选型对比 431603
免费天气API,全国天气 JSON API接口,可以获取五天的天气预报 363385
最新MyEclipse8.5注册码,有效期到2020年 (已经更新) 353913
免费天气API,天气JSON API,不限次数获取十五天的天气预报 318800
Elasticsearch教程(四) elasticsearch head 插件安装和使用 237662
Jackson 时间格式化,时间注解 @JsonFormat 用法、时差问题说明 201624
谈谈斐讯路由器劫持,你用斐讯路由器,你需要知道的事情 146175
Elasticsearch教程(一),全程直播(小白级别) 126538
Java 信任所有SSL证书,HTTPS请求抛错,忽略证书请求完美解决 103702

骚码加入我们 / 千人QQ群:259217951

入群需要5元为的是没有垃圾广告,如果没有QQ钱包,可以加群主拉进。

二维码生成 来自 >> 二维码生成器

支付扫码

所有赞助/开支都讲公开明细,用于网站维护:赞助名单查看

查看我的收藏

正在加载... ...