當前位置: 妍妍網 > 碼農

Spring Boot+RocketMQ 實作多例項分布式環境下的事件驅動

2024-01-26碼農

來自: blog.csdn.net/m0_55712478/article/details/135242345

1為什麽要使用MQ?

Spring Boot Event這篇文章中 已經透過Guava或者SpringBoot自身的Listener實作了事件驅動,已經做到了對業務的解耦。為什麽還要用到MQ來進行業務解耦呢?

首先無論是透過Guava還是Spring Boot自身提供的監聽註解來實作的事件驅動他都是處於同一行程中的,意思就是當前事件推播後只有當前的行程可以進行消費。

透過MQ可以實作將事件推播到行程外的Broker中,在多例項/分布式環境下,其他的服務在訂閱同一事件(Topic)時,可以在各自的服務中進行消費,最大化空閑服務的利用。

圖片

源碼地址:

https://gitee.com/sparkle3021/springboot3-study

2整合RocketMQ

依賴版本
  • JDK 17

  • Spring Boot 3.2.0

  • RocketMQ-Client 5.0.4

  • RocketMQ-Starter 2.2.0

  • Spring Boot 3.0+ 取消了對spring.factories的支持。所以在匯入時需要手動引入RocketMQ的配置類。

    引入RocketMQ依賴

    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.4</version>
    </dependency>
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
    </dependency>

    解決Spring Boot3+不相容 spring.factories

    rocketmq-spring-boot-starter:2.2.2版本中:

    圖片

    參考配置檔

    # RocketMQ 配置
    rocketmq:
    name-server: 127.0.0.1:9876
    consumer:
    group: event-mq-group
    # 一次拉取訊息最大值,註意是拉取訊息的最大值而非消費最大值
    pull-batch-size: 1
    producer:
    # 發送同一類訊息的設定為同一個group,保證唯一
    group: event-mq-group
    # 發送訊息超時時間,預設3000
    sendMessageTimeout: 10000
    # 發送訊息失敗重試次數,預設2
    retryTimesWhenSendFailed: 2
    # 異步訊息重試此處,預設2
    retryTimesWhenSendAsyncFailed: 2
    # 訊息最大長度,預設1024 * 1024 * 4(預設4M)
    maxMessageSize: 4096
    # 壓縮訊息閾值,預設4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在內部發送失敗時重試另一個broker,預設false
    retryNextServer: false

    參考Issue

  • 方法一 :透過 @Import(RocketMQAutoConfiguration. class) 在配置類中引入

  • 方法二:在resources資源目錄下建立資料夾及檔 META-INF/spring,org.springframework.boot.autoconfigure.AutoConfiguration.imports

  • 檔內容為RocketMQ自動配置類路徑: org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

    3RocketMQ 使用

    解決Spring Boot3+不支持spring.factories的問題

    import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Import;
    /**
     * 啟動類
     */
    @Import(RocketMQAutoConfiguration. class)
    @SpringBootApplication
    public class MQEventApplication {
    public static void main(String[] args) {
    SpringApplication.run(MQEventApplication. class, args);
    }
    }

    RocketMQ操作工具

    RocketMQ Message實體

    import cn.hutool.core.util.IdUtil;
    import jakarta.validation.constraints.NotBlank;
    import lombok.AllArgsConstructor;
    import lombok.Builder;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.commons.collections.CollectionUtils;
    import org.apache.commons.lang3.ObjectUtils;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import java.io.Serializable;
    import java.util.List;
    /**
     * RocketMQ 訊息
     */
    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public class RocketMQMessage<T> implements Serializable {
    /**
    * 訊息佇列主題
    */
    @NotBlank(message = "MQ Topic 不能為空")
    private String topic;
    /**
    * 延遲級別
    */
    @Builder.Default
    private DelayLevel delayLevel = DelayLevel.OFF;
    /**
    * 訊息體
    */
    private T message;
    /**
    * 訊息體
    */
    private List<T> messages;
    /**
    * 使用有序訊息發送時,指定發送到佇列
    */
    private String hashKey;
    /**
    * 任務Id,用於日誌打印相關資訊
    */
    @Builder.Default
    private String taskId = IdUtil.fastSimpleUUID();
    }






    RocketMQTemplate 二次封裝

    import com.yiyan.study.domain.RocketMQMessage;
    import jakarta.annotation.Resource;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    /**
     * RocketMQ 訊息工具類
     */
    @Slf4j
    @Component
    public class RocketMQService {
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Value("${rocketmq.producer.sendMessageTimeout}")
    private int sendMessageTimeout;
    /**
    * 異步發送訊息回呼
    *
    * @param taskId 任務Id
    * @param topic 訊息主題
    * @return the send callback
    */
    private static SendCallback asyncSendCallback(String taskId, String topic) {
    return new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
    log.info("ROCKETMQ 異步訊息發送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());
    }
    @Override
    public void onException(Throwable throwable) {
    log.error("ROCKETMQ 異步訊息發送失敗 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());
    }
    };
    }
    /**
    * 發送同步訊息,使用有序發送請設定HashKey
    *
    * @param message 訊息參數
    */
    public <T> void syncSend(RocketMQMessage<T> message) {
    log.info("ROCKETMQ 同步訊息發送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
    SendResult sendResult;
    if (StringUtils.isNotBlank(message.getHashKey())) {
    sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
    else {
    sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());
    }
    log.info("ROCKETMQ 同步訊息發送結果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
    message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
    }
    /**
    * 批次發送同步訊息
    *
    * @param message 訊息參數
    */
    public <T> void syncSendBatch(RocketMQMessage<T> message) {
    log.info("ROCKETMQ 同步訊息-批次發送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
    message.getTaskId(), message.getTopic(), message.getMessages().size());
    SendResult sendResult;
    if (StringUtils.isNotBlank(message.getHashKey())) {
    sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());
    else {
    sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());
    }
    log.info("ROCKETMQ 同步訊息-批次發送結果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
    message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
    }
    /**
    * 異步發送訊息,異步返回訊息結果
    *
    * @param message 訊息參數
    */
    public <T> void asyncSend(RocketMQMessage<T> message) {
    log.info("ROCKETMQ 異步訊息發送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
    if (StringUtils.isNotBlank(message.getHashKey())) {
    rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),
    asyncSendCallback(message.getTaskId(), message.getTopic()));
    else {
    rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),
    asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());
    }
    }
    /**
    * 批次異步發送訊息
    *
    * @param message 訊息參數
    */
    public <T> void asyncSendBatch(RocketMQMessage<T> message) {
    log.info("ROCKETMQ 異步訊息-批次發送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
    message.getTaskId(), message.getTopic(), message.getMessages().size());
    if (StringUtils.isNotBlank(message.getHashKey())) {
    rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),
    asyncSendCallback(message.getTaskId(), message.getTopic()));
    else {
    rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),
    asyncSendCallback(message.getTaskId(), message.getTopic()));
    }
    }
    /**
    * 單向發送訊息,不關心返回結果,容易訊息遺失,適合日誌收集、不精確統計等訊息發送;
    *
    * @param message 訊息參數
    */
    public <T> void sendOneWay(RocketMQMessage<T> message) {
    sendOneWay(message, false);
    }
    /**
    * 單向訊息 - 批次發送
    *
    * @param message 訊息體
    * @param batch 是否為批次操作
    */
    public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {
    log.info((batch ? "ROCKETMQ 單向訊息發送 : [TaskId:{}] - [Topic:{}]"
    "ROCKETMQ 單向訊息-批次發送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),
    message.getTaskId(), message.getTopic(), message.getMessages().size());
    if (StringUtils.isNotBlank(message.getHashKey())) {
    if (batch) {
    message.getMessages().
    forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));
    else {
    rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
    }
    else {
    if (batch) {
    message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));
    else {
    rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());
    }
    }
    }
    }









    定義RocketMQ消費者

    import com.yiyan.study.constants.MQConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    /**
     * MQ訊息監聽
     */
    @Component
    @Slf4j
    @RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,
    consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
    public class MQListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
    log.info("MQListener 接收訊息 : {}", message);
    }
    }

    定義測試類發送訊息

    import cn.hutool.core.thread.ThreadUtil;
    import com.yiyan.study.constants.MQConfig;
    import com.yiyan.study.domain.RocketMQMessage;
    import com.yiyan.study.utils.RocketMQService;
    import jakarta.annotation.Resource;
    import org.junit.jupiter.api.Test;
    import org.springframework.boot.test.context.SpringBootTest;
    /**
     * MQ測試
     */
    @SpringBootTest
    public class MQTest {
    @Resource
    private RocketMQService rocketMQService;
    @Test
    public void sendMessage() {
    int count = 1;
    while (count <= 50) {
    rocketMQService.syncSend(RocketMQMessage.builder()
    .topic(MQConfig.EVENT_TOPIC)
    .message(count++)
    .build());
    }
    // 休眠等待消費訊息
    ThreadUtil.sleep(2000L);
    }
    }

    4測試

    <END>