Springboot 集成Aliyun MQ消息队列,Aliyun 消息队列配置及代码实现
今天需要使用 Springboot 来集成 Aliyun 消息队列,看了官方的 Demo ,都是 Spring XML 方式配置。对照着做了一遍 Springboot 配置的方式。直接上代码+配置。
Aliyun 队列申请,我就不做叙述了。链接:https://ons.console.aliyun.com/
Springboot yml MQ配置:
aliyun:
accessKey: LTAIlH****7IMXnuV
accessSecretKey: T981eNFj****EycS0jq8xdThdfXCo
queue:
log:
topic: ios-****-log-topic_dev
producerId: PID_ios-****-log_dev
consumerId: CID_ios-****-log_dev
stock:
topic: ios-****-stock-topic_dev
producerId: PID_ios-****-stock_dev
consumerId: CID_ios-****-stock_dev
****是我不让你们看见而已,别你也****,另外建议本地测试或者开发采用公网的 MQ ,线上配置采用你对应的 ECS 或者对应的地区的 MQ 。这样速度会快一点。
Springboot @Bean Java代码
@Value("${aliyun.accessKey}")
private String accessKey;
@Value("${aliyun.accessSecretKey}")
private String secretKey;
@Value("${aliyun.queue.log.producerId}")
private String logPriducerId;
@Value("${aliyun.queue.log.consumerId}")
private String logConsumerId;
@Value("${aliyun.queue.log.topic}")
private String logTopic;
@Value("${aliyun.queue.stock.producerId}")
private String stockPriducerId;
@Value("${aliyun.queue.stock.consumerId}")
private String stockConsumerId;
@Value("${aliyun.queue.stock.topic}")
private String stockTopic;
@Bean(name="logProducerBean")
public ProducerBean initLogProducerBean(){
ProducerBean producerBean = new ProducerBean();
Properties properties = new Properties();
// 您在控制台创建的 Producer ID
properties.put(PropertyKeyConst.ProducerId, logPriducerId);
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, accessKey);
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, secretKey);
producerBean.setProperties(properties);
producerBean.start();
return producerBean;
}
@Bean(name="stockProducerBean")
public ProducerBean initStockProducerBean(){
ProducerBean producerBean = new ProducerBean();
Properties properties = new Properties();
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.ProducerId, stockPriducerId);
properties.put(PropertyKeyConst.SecretKey, secretKey);
producerBean.setProperties(properties);
producerBean.start();
return producerBean;
}
@Bean(name="stockConsumerBean")
public ConsumerBean initStockConsumerBean(){
ConsumerBean consumerBean = new ConsumerBean();
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, stockConsumerId);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
properties.put(PropertyKeyConst.ConsumeThreadNums, 50);
consumerBean.setProperties(properties);
//监听消息
MessageListener messageListener = (message, context) -> {
System.out.println("Receive2: " + message.getMsgID());
try {
return Action.CommitMessage;
}catch (Exception e) {
return Action.ReconsumeLater;
}
};
Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
Subscription subscription = new Subscription();
subscription.setTopic(stockTopic);
subscription.setExpression("*");//tag 接受所有
subscriptionTable.put(subscription,messageListener);
consumerBean.setSubscriptionTable(subscriptionTable);
consumerBean.start();
return consumerBean;
}
@Bean(name="logConsumerBean")
public Consumer initLogConsumerBean(){
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, logConsumerId);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
properties.put(PropertyKeyConst.ConsumeThreadNums, 40);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.start();
//订阅消息
//订阅多个 Tag TagA||TagB,如果模糊订阅 *
consumer.subscribe(logTopic, "TagA||TagB", (message, context) -> {
System.out.println("Receive: " + message);
try {
//处理你的业务
return Action.CommitMessage;
}catch (Exception e) {
return Action.ReconsumeLater;
}
});
return consumer;
}
阿里云 队列重试次数说明:
这种普通队列,保持着至少一次的规则,就是当你返回 ReconsumeLater 后,它会按你的 Delay 次数来计算具体的时间再次请求。默认是16次,对应的时间表如下:
第几次重试 | 每次重试间隔时间 | 第几次重试 | 每次重试间隔时间 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分钟 |
2 | 30 秒 | 10 | 8 分钟 |
3 | 1 分钟 | 11 | 9 分钟 |
4 | 2 分钟 | 12 | 10 分钟 |
5 | 3 分钟 | 13 | 20 分钟 |
6 | 4 分钟 | 14 | 30 分钟 |
7 | 5 分钟 | 15 | 1 小时 |
8 | 6 分钟 | 16 | 2 小时 |
自定义队列重试次数:
Properties properties = new Properties();
//配置对应 Consumer ID 的最大消息重试次数为20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);
队列重试规则的定义:
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//方法3:消息处理逻辑抛出异常,消息将重试
doConsumeMessage(message);
//方式1:返回 Action.ReconsumeLater,消息将重试
return Action.ReconsumeLater;
//方式2:返回 null,消息将重试
return null;
//方式3:直接抛出异常, 消息将重试
throw new RuntimeException("Consumer Message exceotion");
}
}
版权所属:SO JSON在线解析
原文地址:https://www.sojson.com/blog/293.html
转载时必须以链接形式注明原始出处及本声明。
本文主题:
如果本文对你有帮助,那么请你赞助我,让我更有激情的写下去,帮助更多的人。