RocketMQ-3.0-SpringBoot集成RocketocketMQ


RocketMQ集成SpringBoot

我们创建两个SpringBoot项目,一个模拟生产者方的服务器,一个模拟消费者方的服务器

生产者方的application.yml文件

rocketmq:
    name-server: 192.168.183.131:9876
    producer:
        group: boot-producer-group

消费者方的application.yml文件

server:
    port: 8081
rocketmq:
    name-server: 192.168.183.131:9876
    # 一个boot项目中可以写很多个消费者程序,但是在一般开发中,一个boot项目只对应一个消费者

生产者发送消息

MsgModel

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {
    private String orderSn;
    private Integer userId;
    private String desc;// 下单 短信 物流
}

通过RocketMQTemplate发送同步、异步、单向、延迟、顺序消息

// RocketMQTemplate
@Autowired
private RocketMQTemplate rocketMQTemplate;

@Test
void contextLoads() {
    // 同步消息
    rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");

    // 异步消息
    rocketMQTemplate.asyncSend("bootTestTopic", "我是boot的一个异步消息", new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("成功");
        }

        @Override
        public void onException(Throwable throwable) {
            System.out.println("失败:" + throwable.getMessage());
        }
    });

    // 单向消息
    rocketMQTemplate.sendOneWay("bootOnewayTopic", "我是boot的一个单向消息");

    Message<String> message = MessageBuilder.withPayload("我是boot的一个延迟消息").build();
    // 延迟消息
    rocketMQTemplate.syncSend("bootMsTopic", message, 3000, 3);

    // 顺序消息
    // 发送者方 需要将一组消息都发在同一个队列中
    // 消费者方 需要单线程消费
    List<MsgModel> msgModels = Arrays.asList(
            new MsgModel("qwer", 1, "下单"),
            new MsgModel("qwer", 1, "短信"),
            new MsgModel("qwer", 1, "物流"),
            new MsgModel("zxcv", 2, "下单"),
            new MsgModel("zxcv", 2, "短信"),
            new MsgModel("zxcv", 2, "物流")
    );

    msgModels.forEach(msgModel -> {
        // 发送顺序消息 一般都是以json的格式进行处理
        rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
    });
}

发送带Tag的消息

/**
 * 带tag的消息
 *
 * @throws Exception
 */
@Test
void tagKeyTest() throws Exception {
    // topic:tag
//        rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
//        rocketMQTemplate.syncSend("bootTagTopic:3", "我是一个带tag的消息 a = 3");

    // key是携带在消息头里的
    Message<String> message = MessageBuilder.withPayload("我是一个带key的消息")
            .setHeader(RocketMQHeaders.KEYS, "qwertyasdf")
            .build();

    rocketMQTemplate.syncSend("bootKeyTopic:tagA", message);
}

测试消息消费模式

消息消费模式:

  • 集群模式
  • 广播模式
/**
 * 测试消息消费模式 集群模式、广播模式
 *
 * @throws Exception
 */
@Test
void modeTest() throws Exception {
    for (int i = 0; i < 5; i++) {
        rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "条消息");
    }
}

消费者消费信息

集成了SpringBoot后,生产者方发送消息几乎都是大差不差,真正有区别的是消费者方

消费同步、异步、单向、延迟消息

ABootSimpleMsgListener

@Component
@RocketMQMessageListener(
        topic = "bootTestTopic",
        consumerGroup = "boot-test-consumer-group"
)
public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> {

    /**
     * 消费者方法
     * 如果泛型制定了固定的类型 那么消息体就是我们的参数
     *
     * @param message 方法没有报错就证明消息被签收了
     * 报错了就是拒收,会重试
     */
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(new String(message.getBody()));

    }
}

消费单向消息和延迟消息同理

消费顺序消息

在非SpringBoot发送RocketMQ消息里面有提及过并发模式和顺序模式:

  • MessageListenerConcurrently 并发模式 多线程的 重试16次
  • MessageListenerOrderly 顺序模式 单线程

同理的,在集成了SpringBoot后写法如下

ConsumeMode:

  • CONCURRENTLY
  • ORDERLY

BOrderMsgListener

@Component
@RocketMQMessageListener(
        topic = "bootOrderlyTopic",
        consumerGroup = "boot-orderly-consumer-group",
        consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程
        maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        MsgModel msgModel = JSON.parseObject(new String(messageExt.getBody()), MsgModel.class);
        System.out.println(msgModel);
    }
}

消费带Tag的消息

注意一下,在selectorExpression里面的||,是并的意思,不是或,意为消费带tagA和tagB的消息

CTagMsgListener

@Component
@RocketMQMessageListener(
        topic = "bootKeyTopic",
//        topic = "bootTagTopic",
        consumerGroup = "boot-tag-consumer-group",
        selectorType = SelectorType.TAG, // tag过滤模式
        selectorExpression = "tagA || tagB"
//        selectorType = SelectorType.SQL92, // sql92过滤模式,官方默认不支持sql92,要去yml配置文件开启(不常用,了解即可)
//        selectorExpression = "a < 5" // broker.conf中开启enablePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println(messageExt.getKeys());
        System.out.println(new String(messageExt.getBody()));
    }
}

消费模式

CLUSTERING

集群模式,负载均衡模式,也是messageModel默认的模式

在此模式下,同一消费者组中的消费者会对所有的消息进行负载均衡消费,即多个消费者交替消费同一个主题里面的消息

例如:往某一主题发送了十条消息,则在该模式下,所有的消费者会均摊消费这十条消息。

DConsumerClustering1

/**
 * [CLUSTERING] 集群模式下,队列会被消费者分摊,队列数量>=消费者数量,RocketMQ服务器会记录消息的消费位点并处理
 * BROADCASTING 广播模式下,消息会被每一个消费者都处理一次,RcoketMQ服务器不会记录消费位点,也不会重试
 */
@Component
@RocketMQMessageListener(
        topic = "modeTopic",
        consumerGroup = "mode-consumer-group-a",
        // messageModel默认是集群模式(CLUSTERING)负载均衡
        messageModel = MessageModel.CLUSTERING
)
public class DConsumerClustering1 implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("我是mode-consumer-group-a组的第一个消费者: " + message);
    }
}

DConsumerClustering2

@Component
@RocketMQMessageListener(
        topic = "modeTopic",
        consumerGroup = "mode-consumer-group-a",
        // messageModel默认是集群模式(CLUSTERING)负载均衡
        messageModel = MessageModel.CLUSTERING
)
public class DConsumerClustering2 implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("我是mode-consumer-group-a组的第二个消费者: " + message);
    }
}

DConsumerClustering3

@Component
@RocketMQMessageListener(
        topic = "modeTopic",
        consumerGroup = "mode-consumer-group-a",
        messageModel = MessageModel.CLUSTERING // 集群模式
)
public class DConsumerClustering3 implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("我是mode-consumer-group-a组的第三个消费者: " + message);
    }
}

BROADCASTING

广播模式,表示每个消费者都消费一遍订阅的主题的消息

例如:有五条消息,则每个消费者各都会消费五条,而不是一共消费五条

DConsumerBroadcasting1

@Component
@RocketMQMessageListener(
        topic = "modeTopic",
        consumerGroup = "mode-consumer-group-b",
        messageModel = MessageModel.BROADCASTING // 广播模式
)
public class DConsumerBroadcasting1 implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("我是mode-consumer-group-b组的第一个消费者: " + message);
    }
}

DConsumerBroadcasting2

@Component
@RocketMQMessageListener(
        topic = "modeTopic",
        consumerGroup = "mode-consumer-group-b",
        messageModel = MessageModel.BROADCASTING // 广播模式
)
public class DConsumerBroadcasting2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("我是mode-consumer-group-b组的第二个消费者: " + message);
    }
}

DConsumerBroadcasting3

@Component
@RocketMQMessageListener(
        topic = "modeTopic",
        consumerGroup = "mode-consumer-group-b",
        messageModel = MessageModel.BROADCASTING // 广播模式
)
public class DConsumerBroadcasting3 implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("我是mode-consumer-group-b组的第三个消费者: " + message);
    }
}

文章作者: Feliks
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Feliks !
评论
  目录