RocketMQ集成SpringBoot
我们创建两个SpringBoot项目,一个模拟生产者方的服务器,一个模拟消费者方的服务器
生产者方的application.yml文件
rocketmq:
name-server: 192.168.183.131:9876
producer:
group: boot-producer-group
消费者方的application.yml文件
server:
port: 8081
rocketmq:
name-server: 192.168.183.131:9876
# 一个boot项目中可以写很多个消费者程序,但是在一般开发中,一个boot项目只对应一个消费者
生产者发送消息
MsgModel
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {
private String orderSn;
private Integer userId;
private String desc;// 下单 短信 物流
}
通过RocketMQTemplate发送同步、异步、单向、延迟、顺序消息
// RocketMQTemplate
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
void contextLoads() {
// 同步消息
rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");
// 异步消息
rocketMQTemplate.asyncSend("bootTestTopic", "我是boot的一个异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("失败:" + throwable.getMessage());
}
});
// 单向消息
rocketMQTemplate.sendOneWay("bootOnewayTopic", "我是boot的一个单向消息");
Message<String> message = MessageBuilder.withPayload("我是boot的一个延迟消息").build();
// 延迟消息
rocketMQTemplate.syncSend("bootMsTopic", message, 3000, 3);
// 顺序消息
// 发送者方 需要将一组消息都发在同一个队列中
// 消费者方 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(
new MsgModel("qwer", 1, "下单"),
new MsgModel("qwer", 1, "短信"),
new MsgModel("qwer", 1, "物流"),
new MsgModel("zxcv", 2, "下单"),
new MsgModel("zxcv", 2, "短信"),
new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {
// 发送顺序消息 一般都是以json的格式进行处理
rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
});
}
发送带Tag的消息
/**
* 带tag的消息
*
* @throws Exception
*/
@Test
void tagKeyTest() throws Exception {
// topic:tag
// rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
// rocketMQTemplate.syncSend("bootTagTopic:3", "我是一个带tag的消息 a = 3");
// key是携带在消息头里的
Message<String> message = MessageBuilder.withPayload("我是一个带key的消息")
.setHeader(RocketMQHeaders.KEYS, "qwertyasdf")
.build();
rocketMQTemplate.syncSend("bootKeyTopic:tagA", message);
}
测试消息消费模式
消息消费模式:
- 集群模式
- 广播模式
/**
* 测试消息消费模式 集群模式、广播模式
*
* @throws Exception
*/
@Test
void modeTest() throws Exception {
for (int i = 0; i < 5; i++) {
rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "条消息");
}
}
消费者消费信息
集成了SpringBoot后,生产者方发送消息几乎都是大差不差,真正有区别的是消费者方
消费同步、异步、单向、延迟消息
ABootSimpleMsgListener
@Component
@RocketMQMessageListener(
topic = "bootTestTopic",
consumerGroup = "boot-test-consumer-group"
)
public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> {
/**
* 消费者方法
* 如果泛型制定了固定的类型 那么消息体就是我们的参数
*
* @param message 方法没有报错就证明消息被签收了
* 报错了就是拒收,会重试
*/
@Override
public void onMessage(MessageExt message) {
System.out.println(new String(message.getBody()));
}
}
消费单向消息和延迟消息同理
消费顺序消息
在非SpringBoot发送RocketMQ消息里面有提及过并发模式和顺序模式:
- MessageListenerConcurrently 并发模式 多线程的 重试16次
- MessageListenerOrderly 顺序模式 单线程
同理的,在集成了SpringBoot后写法如下
ConsumeMode:
- CONCURRENTLY
- ORDERLY
BOrderMsgListener
@Component
@RocketMQMessageListener(
topic = "bootOrderlyTopic",
consumerGroup = "boot-orderly-consumer-group",
consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程
maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
MsgModel msgModel = JSON.parseObject(new String(messageExt.getBody()), MsgModel.class);
System.out.println(msgModel);
}
}
消费带Tag的消息
注意一下,在selectorExpression里面的||,是并的意思,不是或,意为消费带tagA和tagB的消息
CTagMsgListener
@Component
@RocketMQMessageListener(
topic = "bootKeyTopic",
// topic = "bootTagTopic",
consumerGroup = "boot-tag-consumer-group",
selectorType = SelectorType.TAG, // tag过滤模式
selectorExpression = "tagA || tagB"
// selectorType = SelectorType.SQL92, // sql92过滤模式,官方默认不支持sql92,要去yml配置文件开启(不常用,了解即可)
// selectorExpression = "a < 5" // broker.conf中开启enablePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println(messageExt.getKeys());
System.out.println(new String(messageExt.getBody()));
}
}
消费模式
CLUSTERING
集群模式,负载均衡模式,也是messageModel默认的模式
在此模式下,同一消费者组中的消费者会对所有的消息进行负载均衡消费,即多个消费者交替消费同一个主题里面的消息
例如:往某一主题发送了十条消息,则在该模式下,所有的消费者会均摊消费这十条消息。
DConsumerClustering1
/**
* [CLUSTERING] 集群模式下,队列会被消费者分摊,队列数量>=消费者数量,RocketMQ服务器会记录消息的消费位点并处理
* BROADCASTING 广播模式下,消息会被每一个消费者都处理一次,RcoketMQ服务器不会记录消费位点,也不会重试
*/
@Component
@RocketMQMessageListener(
topic = "modeTopic",
consumerGroup = "mode-consumer-group-a",
// messageModel默认是集群模式(CLUSTERING)负载均衡
messageModel = MessageModel.CLUSTERING
)
public class DConsumerClustering1 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-a组的第一个消费者: " + message);
}
}
DConsumerClustering2
@Component
@RocketMQMessageListener(
topic = "modeTopic",
consumerGroup = "mode-consumer-group-a",
// messageModel默认是集群模式(CLUSTERING)负载均衡
messageModel = MessageModel.CLUSTERING
)
public class DConsumerClustering2 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-a组的第二个消费者: " + message);
}
}
DConsumerClustering3
@Component
@RocketMQMessageListener(
topic = "modeTopic",
consumerGroup = "mode-consumer-group-a",
messageModel = MessageModel.CLUSTERING // 集群模式
)
public class DConsumerClustering3 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-a组的第三个消费者: " + message);
}
}
BROADCASTING
广播模式,表示每个消费者都消费一遍订阅的主题的消息
例如:有五条消息,则每个消费者各都会消费五条,而不是一共消费五条
DConsumerBroadcasting1
@Component
@RocketMQMessageListener(
topic = "modeTopic",
consumerGroup = "mode-consumer-group-b",
messageModel = MessageModel.BROADCASTING // 广播模式
)
public class DConsumerBroadcasting1 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-b组的第一个消费者: " + message);
}
}
DConsumerBroadcasting2
@Component
@RocketMQMessageListener(
topic = "modeTopic",
consumerGroup = "mode-consumer-group-b",
messageModel = MessageModel.BROADCASTING // 广播模式
)
public class DConsumerBroadcasting2 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-b组的第二个消费者: " + message);
}
}
DConsumerBroadcasting3
@Component
@RocketMQMessageListener(
topic = "modeTopic",
consumerGroup = "mode-consumer-group-b",
messageModel = MessageModel.BROADCASTING // 广播模式
)
public class DConsumerBroadcasting3 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-b组的第三个消费者: " + message);
}
}