RocketMQ-2.0-非boot方式发送消息


发送一条消息

  1. 创建一个demo文件夹

  2. 创建ASimpleTest.java,pom包导入junit,用@Test的测试单例去模拟启动生产者和消费者

    image-20240411093540868

  3. 将RocketMQ在的服务器ip单独抽离,后面会重复使用

    public interface MqConstant {
        String NAME_SRV_ADDR = "192.168.183.131:9876";
    }
    
  4. 生产者

    // 生产者
    @Test
    public void simpleProducer() throws Exception {
        // 创建一个生产者并指定组名
        DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
        // 连接namesrv
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        // 启动
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 创建一个消息
            Message message = new Message("testTopic", "我是一条简单的消息".getBytes());
            // 发送消息
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult.getSendStatus());
        }
    
        // 关闭生产者
        producer.shutdown();
    }
  5. 消费者

    // 消费者
    @Test
    public void simpleConsumer() throws Exception {
        // 不管是push还是pull,底层都是pull模式,通过长轮询的方案实现
        // 创建一个消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
    //        DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("test-consumer-group");
        // 连接namesrv
        consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        // 订阅一个主题   * 表示订阅这个主题中所有的消息,后期会有消息过滤
        consumer.subscribe("testTopic", "*");
        // 设置消息监听器(一直监听,异步回调方式)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 这个就是消费的方法(业务处理)
                System.out.println("我是消费者");
                System.out.println(list.get(0).toString());
                System.out.println(new String(list.get(0).getBody()));
                System.out.println("消费的上下文:" + consumeConcurrentlyContext);
                // 返回值 CONSUME_SUCCESS成功,表示消息会从mq出队,
                // RECONSUME_LATER(报错/null)失败,消息会重新回到mq队列,过一会重新投递出来,给当前消费者或者其他消费者消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    
        // 启动
        consumer.start();
        // 挂起当前的jvm
        System.in.read();
    }

发送一条异步消息

  1. BSyncTest.java中添加生产异步消息的生产者

    image-20240411100121420

  2. 消费者

    @Test
    public void asyncProducer() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        producer.start();
        Message message = new Message("asyncTopic", "我是一条异步消息".getBytes());
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功");
            }
    
            @Override
            public void onException(Throwable throwable) {
                System.out.println("发送失败" + throwable);
            }
        });
        System.out.println("先执行");
        System.in.read();
    }

发送一条单向消息

image-20240411100153991

生产者

@Test
public void onewayProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    Message message = new Message("onewayTopic", "日志xxx".getBytes());
    producer.sendOneway(message);
    System.out.println("成功发送一条单向消息");
    producer.shutdown();
}

消费者

// 消费者
@Test
public void onewayConsumer() throws Exception {
    // 不管是push还是pull,底层都是pull模式,通过长轮询的方案实现
    // 创建一个消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oneway-consumer-group");
//        DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("test-consumer-group");
    // 连接namesrv
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    // 订阅一个主题   * 表示订阅这个主题中所有的消息,后期会有消息过滤
    consumer.subscribe("onewayTopic", "*");
    // 设置消息监听器(一直监听,异步回调方式)
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            // 这个就是消费的方法(业务处理)
            System.out.println("我是消费者");
            System.out.println(list.get(0).toString());
            System.out.println(new String(list.get(0).getBody()));
            System.out.println("消费的上下文:" + consumeConcurrentlyContext);
            // 返回值 CONSUME_SUCCESS成功,表示消息会从mq出队,
            // RECONSUME_LATER(报错/null)失败,消息会重新回到mq队列,过一会重新投递出来,给当前消费者或者其他消费者消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    // 启动
    consumer.start();

    // 挂起当前的jvm
    System.in.read();
}

发送一条延迟消息

message.setDelayTimeLevel(3);

给消息设置一个延迟等级(一共有十八个等级)

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

生产者

// 生产者
@Test
public void msProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    Message message = new Message("orderMsTopic", "订单号, 座位号".getBytes());
    // 给消息设置一个延迟等级(一共有十八个等级)
    // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    message.setDelayTimeLevel(3);
    producer.send(message);
    System.out.println(new Date());
    producer.shutdown();
}

消费者

// 消费者
@Test
public void msConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ms-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("orderMsTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            // 消费的方法
            System.out.println("我是消费者");
            System.out.println(list.get(0).toString());
            System.out.println(new String(list.get(0).getBody()));
            System.out.println("消费的上下文:" + consumeConcurrentlyContext);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

发送一组消息

生产者

@Test
public void testBatchProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    List<Message> messages = Arrays.asList(
            new Message("BatchTopic", "一组消息里的A消息".getBytes()),
            new Message("BatchTopic", "一组消息里的B消息".getBytes()),
            new Message("BatchTopic", "一组消息里的C消息".getBytes()),
            new Message("BatchTopic", "一组消息里的D消息".getBytes())
    );
    SendResult sendResult = producer.send(messages);
    System.out.println(sendResult);
    producer.shutdown();
}

消息监听里MessageListenerConcurrently是并发模式,多线程的,失败了会重试16次

消费者

@Test
public void testBatchConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("BatchTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            // 这里执行消费的代码,默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + new String(list.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();
    System.in.read();
}

发送顺序消息

在发送消息的时候要按顺序发送

定义一个MsgModel

image-20240411102057125

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {
    private String orderSn;
    private Integer userId;
    private String desc;// 下单 短信 物流
}

yml配置连接数据库:

spring:
    datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        username: root
        password: Hzx492357816
        url: jdbc:mysql://localhost:3306/rcktest?serverTimezone=GMT%2B8

mysql数据库

image-20240411104617625

/*
 Navicat Premium Data Transfer

 Source Server         : localhost
 Source Server Type    : MySQL
 Source Server Version : 80031
 Source Host           : localhost:3306
 Source Schema         : rcktest

 Target Server Type    : MySQL
 Target Server Version : 80031
 File Encoding         : 65001

 Date: 11/04/2024 10:46:34
*/

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for order_oper_log
-- ----------------------------
DROP TABLE IF EXISTS `order_oper_log`;
CREATE TABLE `order_oper_log`  (
  `id` int(0) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `type` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
  `order_sn` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
  `user` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE INDEX `order_unique_sn`(`order_sn`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 29 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of order_oper_log
-- ----------------------------
INSERT INTO `order_oper_log` VALUES (1, '1', '123', '111');
INSERT INTO `order_oper_log` VALUES (2, '1', '60f0e378-698e-45f5-ba5d-ef2c08efb9b7', '123');
INSERT INTO `order_oper_log` VALUES (5, '1', '80a2aecd-ec6e-4f09-a186-579ebd221bba', '123');
INSERT INTO `order_oper_log` VALUES (6, '1', 'e245548f-0e28-4f28-bc9f-1fa3f298dfac', '123');
INSERT INTO `order_oper_log` VALUES (9, '1', '3eaf357a-c753-4057-8b88-e7738eb37b48', '123');

SET FOREIGN_KEY_CHECKS = 1;

生产者:

@Test
public void orderlyProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();

    // 发送顺序消息 发送时要确保有序 并且要发到同一个队列里面
    msgModels.forEach(msgModel -> {
        Message message = new Message("orderlyTopic", msgModel.toString().getBytes());
        try {
            // 发 相同的订单号去相同的队列
            producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 选择队列
                    int hashCode = arg.toString().hashCode();
                    // 取模
                    int i = hashCode % mqs.size();
                    return mqs.get(i);
                }
            }, msgModel.getOrderSn());
        } catch (Exception e) {
            e.printStackTrace();
        }
    });

    producer.shutdown();
    System.out.println("发送完成");
}

消费者

将消息监听设置为MessageListenerOrderly单线程消费

@Test
public void orderlyConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("orderlyTopic", "*");
    // MessageListenerConcurrently 并发模式 多线程的 重试16次
    // MessageListenerOrderly 顺序模式 单线程
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
            System.out.println("线程id: " + Thread.currentThread().getId());
            System.out.println(list.get(0).getBody());
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });

    consumer.start();
    System.in.read();
}

发送带Tag的消息

生产者

// 生产者
@Test
public void tagProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    Message message1 = new Message("tagTopic", "vip1", "我是v1的文章".getBytes());
    Message message2 = new Message("tagTopic", "vip2", "我是v2的文章".getBytes());
    producer.send(message1);
    producer.send(message2);
    System.out.println("发送成功");
    producer.shutdown();
}

消费者在消费消息的时候可以加入过滤,指定哪些带了tag的消息会被消费,哪些不会

消费者消费tag为vip1的消息

// 消费者 vip1
@Test
public void tagConsumer1() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("tagTopic", "vip1");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            System.out.println("我是vip1的消费者" + new String(list.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();
    System.in.read();
}

消费者消费vip1和vip2的消息

// 消费者 vip2
@Test
public void tagConsumer2() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    // String[] tags = subString.split("\\|\\|"); 源码里面是通过||区分标签的
    consumer.subscribe("tagTopic", "vip1 || vip2");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            System.out.println("我是vip2的消费者" + new String(list.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

发送带Key的消息

自定义唯一KeyString key = UUID.randomUUID().toString();

生产者

/**
 * Key 业务参数 我们自身要确保唯一
 * 为了查阅和去重
 * @throws Exception
 */
// 生产者
@Test
public void keyProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    String key = UUID.randomUUID().toString();
    System.out.println(key);
    Message message = new Message("keyTopic", "vip1", key, "我是vip1的文章".getBytes());
    producer.send(message);
    System.out.println("发送成功");
    producer.shutdown();
}

消费者

// 消费者
@Test
public void keyConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-gruop");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("keyTopic", "vip1");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt messageExt = list.get(0);
            System.out.println("我是keyTopic的vip1消费者" + new String(messageExt.getBody()));
            System.out.println("业务的标识: " + messageExt.getKeys());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

消息重试

生产者

/**
 * 生产者
 * @throws Exception
 */
@Test
public void retryProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();

    // 生产者发送消息 重试次数
    producer.setRetryTimesWhenSendFailed(2);
    producer.setRetryTimesWhenSendAsyncFailed(2);

    String key = UUID.randomUUID().toString();
    System.out.println(key);
    Message message = new Message("retryTopic", "vip1", key, "我是vip1的文章".getBytes());
    producer.send(message);
    System.out.println("发送成功");
    producer.shutdown();
}

消息重试的时间是根据延迟等级来推进的

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

  • 延迟等级(一共有十八个等级)
  • 默认重试16次
  • 1.能否自定义重试次数?
  • 2.如果重试了16次(并发模式)顺序模式下(int最大值次)还是失败?是一个死信消息 则会放在一个死信主题中去:%DLQ%retry-consumer-group
  • 3.当消息处理失败的时候,该如何正确的处理?

消费者

/**
 * 消息重试的时间是根据延迟等级来推进的
 * 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
 * 延迟等级(一共有十八个等级)
 * 默认重试16次
 * 1.能否自定义重试次数?
 * 2.如果重试了16次(并发模式)顺序模式下(int最大值次)还是失败?是一个死信消息 则会放在一个死信主题中去:%DLQ%retry-consumer-group
 * 3.当消息处理失败的时候,该如何正确的处理?
 * @throws Exception
 */
@Test
public void retryConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("retryTopic", "*");

    // 设定重试次数
    consumer.setMaxReconsumeTimes(2);

    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt messageExt = list.get(0);
            System.out.println(new Date());
            System.out.println(messageExt.getReconsumeTimes());
            System.out.println(new String(messageExt.getBody()));
            // 业务报错 返回null 返回RECONSUME_LATER 都会重试
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
    consumer.start();
    System.in.read();
}

消费者自定义重试次数

/**
 *
 * @throws Exception
 */
@Test
public void retryDeadConsumer2() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("retryTopic", "*");
    // 设定重试次数
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt messageExt = list.get(0);
            System.out.println(new Date());
            // 业务处理
            try {
                // 对业务处理的代码进行try-catch
                handleDb();
            } catch (Exception e) {
                // 重试
                int reconsumeTimes = messageExt.getReconsumeTimes();
                if (reconsumeTimes >= 3) {
                    // 重试到一定次数就不要重试了
                    System.out.println("通知人工接入处理");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

private void handleDb() {
    int i = 10 / 0;
}

消费死信队列

死信队列topic:%DLQ%retry-consumer-group

消费者

/**
 * 消费死信队列的消费者
 * 直接监听死信主题的消息,记录并通知人工接入处理
 * @throws Exception
 */
@Test
public void retryDeadConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("%DLQ%retry-consumer-group", "*");
    // 设定重试次数
    consumer.setMaxReconsumeTimes(2);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt messageExt = list.get(0);
            System.out.println(new Date());
            System.out.println("通知人工接入处理");
            System.out.println(new String(messageExt.getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

文章作者: Feliks
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Feliks !
评论
  目录