发送一条消息
创建一个demo文件夹
创建ASimpleTest.java,pom包导入junit,用@Test的测试单例去模拟启动生产者和消费者
将RocketMQ在的服务器ip单独抽离,后面会重复使用
public interface MqConstant { String NAME_SRV_ADDR = "192.168.183.131:9876"; }生产者
// 生产者 @Test public void simpleProducer() throws Exception { // 创建一个生产者并指定组名 DefaultMQProducer producer = new DefaultMQProducer("test-producer-group"); // 连接namesrv producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR); // 启动 producer.start(); for (int i = 0; i < 10; i++) { // 创建一个消息 Message message = new Message("testTopic", "我是一条简单的消息".getBytes()); // 发送消息 SendResult sendResult = producer.send(message); System.out.println(sendResult.getSendStatus()); } // 关闭生产者 producer.shutdown(); }消费者
// 消费者 @Test public void simpleConsumer() throws Exception { // 不管是push还是pull,底层都是pull模式,通过长轮询的方案实现 // 创建一个消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group"); // DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("test-consumer-group"); // 连接namesrv consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR); // 订阅一个主题 * 表示订阅这个主题中所有的消息,后期会有消息过滤 consumer.subscribe("testTopic", "*"); // 设置消息监听器(一直监听,异步回调方式) consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { // 这个就是消费的方法(业务处理) System.out.println("我是消费者"); System.out.println(list.get(0).toString()); System.out.println(new String(list.get(0).getBody())); System.out.println("消费的上下文:" + consumeConcurrentlyContext); // 返回值 CONSUME_SUCCESS成功,表示消息会从mq出队, // RECONSUME_LATER(报错/null)失败,消息会重新回到mq队列,过一会重新投递出来,给当前消费者或者其他消费者消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动 consumer.start(); // 挂起当前的jvm System.in.read(); }
发送一条异步消息
BSyncTest.java中添加生产异步消息的生产者
消费者
@Test public void asyncProducer() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("async-producer-group"); producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR); producer.start(); Message message = new Message("asyncTopic", "我是一条异步消息".getBytes()); producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功"); } @Override public void onException(Throwable throwable) { System.out.println("发送失败" + throwable); } }); System.out.println("先执行"); System.in.read(); }
发送一条单向消息
生产者
@Test
public void onewayProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("onewayTopic", "日志xxx".getBytes());
producer.sendOneway(message);
System.out.println("成功发送一条单向消息");
producer.shutdown();
}
消费者
// 消费者
@Test
public void onewayConsumer() throws Exception {
// 不管是push还是pull,底层都是pull模式,通过长轮询的方案实现
// 创建一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oneway-consumer-group");
// DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("test-consumer-group");
// 连接namesrv
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 订阅一个主题 * 表示订阅这个主题中所有的消息,后期会有消息过滤
consumer.subscribe("onewayTopic", "*");
// 设置消息监听器(一直监听,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 这个就是消费的方法(业务处理)
System.out.println("我是消费者");
System.out.println(list.get(0).toString());
System.out.println(new String(list.get(0).getBody()));
System.out.println("消费的上下文:" + consumeConcurrentlyContext);
// 返回值 CONSUME_SUCCESS成功,表示消息会从mq出队,
// RECONSUME_LATER(报错/null)失败,消息会重新回到mq队列,过一会重新投递出来,给当前消费者或者其他消费者消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动
consumer.start();
// 挂起当前的jvm
System.in.read();
}
发送一条延迟消息
message.setDelayTimeLevel(3);:
给消息设置一个延迟等级(一共有十八个等级)
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
生产者
// 生产者
@Test
public void msProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("orderMsTopic", "订单号, 座位号".getBytes());
// 给消息设置一个延迟等级(一共有十八个等级)
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message.setDelayTimeLevel(3);
producer.send(message);
System.out.println(new Date());
producer.shutdown();
}
消费者
// 消费者
@Test
public void msConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ms-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("orderMsTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 消费的方法
System.out.println("我是消费者");
System.out.println(list.get(0).toString());
System.out.println(new String(list.get(0).getBody()));
System.out.println("消费的上下文:" + consumeConcurrentlyContext);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
发送一组消息
生产者
@Test
public void testBatchProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
List<Message> messages = Arrays.asList(
new Message("BatchTopic", "一组消息里的A消息".getBytes()),
new Message("BatchTopic", "一组消息里的B消息".getBytes()),
new Message("BatchTopic", "一组消息里的C消息".getBytes()),
new Message("BatchTopic", "一组消息里的D消息".getBytes())
);
SendResult sendResult = producer.send(messages);
System.out.println(sendResult);
producer.shutdown();
}
消息监听里MessageListenerConcurrently是并发模式,多线程的,失败了会重试16次
消费者
@Test
public void testBatchConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("BatchTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 这里执行消费的代码,默认是多线程消费
System.out.println(Thread.currentThread().getName() + "----" + new String(list.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
发送顺序消息
在发送消息的时候要按顺序发送
定义一个MsgModel
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {
private String orderSn;
private Integer userId;
private String desc;// 下单 短信 物流
}
yml配置连接数据库:
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: Hzx492357816
url: jdbc:mysql://localhost:3306/rcktest?serverTimezone=GMT%2B8
mysql数据库
/*
Navicat Premium Data Transfer
Source Server : localhost
Source Server Type : MySQL
Source Server Version : 80031
Source Host : localhost:3306
Source Schema : rcktest
Target Server Type : MySQL
Target Server Version : 80031
File Encoding : 65001
Date: 11/04/2024 10:46:34
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for order_oper_log
-- ----------------------------
DROP TABLE IF EXISTS `order_oper_log`;
CREATE TABLE `order_oper_log` (
`id` int(0) NOT NULL AUTO_INCREMENT COMMENT 'id',
`type` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
`order_sn` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
`user` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `order_unique_sn`(`order_sn`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 29 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of order_oper_log
-- ----------------------------
INSERT INTO `order_oper_log` VALUES (1, '1', '123', '111');
INSERT INTO `order_oper_log` VALUES (2, '1', '60f0e378-698e-45f5-ba5d-ef2c08efb9b7', '123');
INSERT INTO `order_oper_log` VALUES (5, '1', '80a2aecd-ec6e-4f09-a186-579ebd221bba', '123');
INSERT INTO `order_oper_log` VALUES (6, '1', 'e245548f-0e28-4f28-bc9f-1fa3f298dfac', '123');
INSERT INTO `order_oper_log` VALUES (9, '1', '3eaf357a-c753-4057-8b88-e7738eb37b48', '123');
SET FOREIGN_KEY_CHECKS = 1;
生产者:
@Test
public void orderlyProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
// 发送顺序消息 发送时要确保有序 并且要发到同一个队列里面
msgModels.forEach(msgModel -> {
Message message = new Message("orderlyTopic", msgModel.toString().getBytes());
try {
// 发 相同的订单号去相同的队列
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 选择队列
int hashCode = arg.toString().hashCode();
// 取模
int i = hashCode % mqs.size();
return mqs.get(i);
}
}, msgModel.getOrderSn());
} catch (Exception e) {
e.printStackTrace();
}
});
producer.shutdown();
System.out.println("发送完成");
}
消费者
将消息监听设置为MessageListenerOrderly单线程消费
@Test
public void orderlyConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("orderlyTopic", "*");
// MessageListenerConcurrently 并发模式 多线程的 重试16次
// MessageListenerOrderly 顺序模式 单线程
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
System.out.println("线程id: " + Thread.currentThread().getId());
System.out.println(list.get(0).getBody());
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.in.read();
}
发送带Tag的消息
生产者
// 生产者
@Test
public void tagProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message1 = new Message("tagTopic", "vip1", "我是v1的文章".getBytes());
Message message2 = new Message("tagTopic", "vip2", "我是v2的文章".getBytes());
producer.send(message1);
producer.send(message2);
System.out.println("发送成功");
producer.shutdown();
}
消费者在消费消息的时候可以加入过滤,指定哪些带了tag的消息会被消费,哪些不会
消费者消费tag为vip1的消息
// 消费者 vip1
@Test
public void tagConsumer1() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("tagTopic", "vip1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("我是vip1的消费者" + new String(list.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
消费者消费vip1和vip2的消息
// 消费者 vip2
@Test
public void tagConsumer2() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// String[] tags = subString.split("\\|\\|"); 源码里面是通过||区分标签的
consumer.subscribe("tagTopic", "vip1 || vip2");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("我是vip2的消费者" + new String(list.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
发送带Key的消息
自定义唯一KeyString key = UUID.randomUUID().toString();
生产者
/**
* Key 业务参数 我们自身要确保唯一
* 为了查阅和去重
* @throws Exception
*/
// 生产者
@Test
public void keyProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
String key = UUID.randomUUID().toString();
System.out.println(key);
Message message = new Message("keyTopic", "vip1", key, "我是vip1的文章".getBytes());
producer.send(message);
System.out.println("发送成功");
producer.shutdown();
}
消费者
// 消费者
@Test
public void keyConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-gruop");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("keyTopic", "vip1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = list.get(0);
System.out.println("我是keyTopic的vip1消费者" + new String(messageExt.getBody()));
System.out.println("业务的标识: " + messageExt.getKeys());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
消息重试
生产者
/**
* 生产者
* @throws Exception
*/
@Test
public void retryProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
// 生产者发送消息 重试次数
producer.setRetryTimesWhenSendFailed(2);
producer.setRetryTimesWhenSendAsyncFailed(2);
String key = UUID.randomUUID().toString();
System.out.println(key);
Message message = new Message("retryTopic", "vip1", key, "我是vip1的文章".getBytes());
producer.send(message);
System.out.println("发送成功");
producer.shutdown();
}
消息重试的时间是根据延迟等级来推进的
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- 延迟等级(一共有十八个等级)
- 默认重试16次
- 1.能否自定义重试次数?
- 2.如果重试了16次(并发模式)顺序模式下(int最大值次)还是失败?是一个死信消息 则会放在一个死信主题中去:%DLQ%retry-consumer-group
- 3.当消息处理失败的时候,该如何正确的处理?
消费者
/**
* 消息重试的时间是根据延迟等级来推进的
* 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 延迟等级(一共有十八个等级)
* 默认重试16次
* 1.能否自定义重试次数?
* 2.如果重试了16次(并发模式)顺序模式下(int最大值次)还是失败?是一个死信消息 则会放在一个死信主题中去:%DLQ%retry-consumer-group
* 3.当消息处理失败的时候,该如何正确的处理?
* @throws Exception
*/
@Test
public void retryConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("retryTopic", "*");
// 设定重试次数
consumer.setMaxReconsumeTimes(2);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = list.get(0);
System.out.println(new Date());
System.out.println(messageExt.getReconsumeTimes());
System.out.println(new String(messageExt.getBody()));
// 业务报错 返回null 返回RECONSUME_LATER 都会重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.in.read();
}
消费者自定义重试次数
/**
*
* @throws Exception
*/
@Test
public void retryDeadConsumer2() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("retryTopic", "*");
// 设定重试次数
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = list.get(0);
System.out.println(new Date());
// 业务处理
try {
// 对业务处理的代码进行try-catch
handleDb();
} catch (Exception e) {
// 重试
int reconsumeTimes = messageExt.getReconsumeTimes();
if (reconsumeTimes >= 3) {
// 重试到一定次数就不要重试了
System.out.println("通知人工接入处理");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
private void handleDb() {
int i = 10 / 0;
}
消费死信队列
死信队列topic:%DLQ%retry-consumer-group
消费者
/**
* 消费死信队列的消费者
* 直接监听死信主题的消息,记录并通知人工接入处理
* @throws Exception
*/
@Test
public void retryDeadConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("%DLQ%retry-consumer-group", "*");
// 设定重试次数
consumer.setMaxReconsumeTimes(2);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = list.get(0);
System.out.println(new Date());
System.out.println("通知人工接入处理");
System.out.println(new String(messageExt.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}