來自: blog.csdn.net/m0_55712478/article/details/135242345
1為什麽要使用MQ?
在 Spring Boot Event這篇文章中 已經透過Guava或者SpringBoot自身的Listener實作了事件驅動,已經做到了對業務的解耦。為什麽還要用到MQ來進行業務解耦呢?
首先無論是透過Guava還是Spring Boot自身提供的監聽註解來實作的事件驅動他都是處於同一行程中的,意思就是當前事件推播後只有當前的行程可以進行消費。
透過MQ可以實作將事件推播到行程外的Broker中,在多例項/分布式環境下,其他的服務在訂閱同一事件(Topic)時,可以在各自的服務中進行消費,最大化空閑服務的利用。
源碼地址:
https://gitee.com/sparkle3021/springboot3-study
2整合RocketMQ
依賴版本
JDK 17
Spring Boot 3.2.0
RocketMQ-Client 5.0.4
RocketMQ-Starter 2.2.0
Spring Boot 3.0+ 取消了對spring.factories的支持。所以在匯入時需要手動引入RocketMQ的配置類。
引入RocketMQ依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
解決Spring Boot3+不相容 spring.factories
rocketmq-spring-boot-starter:2.2.2版本中:
參考配置檔
# RocketMQ 配置
rocketmq:
name-server: 127.0.0.1:9876
consumer:
group: event-mq-group
# 一次拉取訊息最大值,註意是拉取訊息的最大值而非消費最大值
pull-batch-size: 1
producer:
# 發送同一類訊息的設定為同一個group,保證唯一
group: event-mq-group
# 發送訊息超時時間,預設3000
sendMessageTimeout: 10000
# 發送訊息失敗重試次數,預設2
retryTimesWhenSendFailed: 2
# 異步訊息重試此處,預設2
retryTimesWhenSendAsyncFailed: 2
# 訊息最大長度,預設1024 * 1024 * 4(預設4M)
maxMessageSize: 4096
# 壓縮訊息閾值,預設4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在內部發送失敗時重試另一個broker,預設false
retryNextServer: false
參考Issue
方法一 :透過
@Import(RocketMQAutoConfiguration. class)
在配置類中引入
方法二:在resources資源目錄下建立資料夾及檔
META-INF/spring,org.springframework.boot.autoconfigure.AutoConfiguration.imports
。
檔內容為RocketMQ自動配置類路徑:
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
3RocketMQ 使用
解決Spring Boot3+不支持spring.factories的問題
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
/**
* 啟動類
*/
@Import(RocketMQAutoConfiguration. class)
@SpringBootApplication
public class MQEventApplication {
public static void main(String[] args) {
SpringApplication.run(MQEventApplication. class, args);
}
}
RocketMQ操作工具
RocketMQ Message實體
import cn.hutool.core.util.IdUtil;
import jakarta.validation.constraints.NotBlank;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.io.Serializable;
import java.util.List;
/**
* RocketMQ 訊息
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RocketMQMessage<T> implements Serializable {
/**
* 訊息佇列主題
*/
@NotBlank(message = "MQ Topic 不能為空")
private String topic;
/**
* 延遲級別
*/
@Builder.Default
private DelayLevel delayLevel = DelayLevel.OFF;
/**
* 訊息體
*/
private T message;
/**
* 訊息體
*/
private List<T> messages;
/**
* 使用有序訊息發送時,指定發送到佇列
*/
private String hashKey;
/**
* 任務Id,用於日誌打印相關資訊
*/
@Builder.Default
private String taskId = IdUtil.fastSimpleUUID();
}
RocketMQTemplate 二次封裝
import com.yiyan.study.domain.RocketMQMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* RocketMQ 訊息工具類
*/
@Slf4j
@Component
public class RocketMQService {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.producer.sendMessageTimeout}")
private int sendMessageTimeout;
/**
* 異步發送訊息回呼
*
* @param taskId 任務Id
* @param topic 訊息主題
* @return the send callback
*/
private static SendCallback asyncSendCallback(String taskId, String topic) {
return new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("ROCKETMQ 異步訊息發送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());
}
@Override
public void onException(Throwable throwable) {
log.error("ROCKETMQ 異步訊息發送失敗 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());
}
};
}
/**
* 發送同步訊息,使用有序發送請設定HashKey
*
* @param message 訊息參數
*/
public <T> void syncSend(RocketMQMessage<T> message) {
log.info("ROCKETMQ 同步訊息發送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
SendResult sendResult;
if (StringUtils.isNotBlank(message.getHashKey())) {
sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
} else {
sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());
}
log.info("ROCKETMQ 同步訊息發送結果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
}
/**
* 批次發送同步訊息
*
* @param message 訊息參數
*/
public <T> void syncSendBatch(RocketMQMessage<T> message) {
log.info("ROCKETMQ 同步訊息-批次發送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
message.getTaskId(), message.getTopic(), message.getMessages().size());
SendResult sendResult;
if (StringUtils.isNotBlank(message.getHashKey())) {
sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());
} else {
sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());
}
log.info("ROCKETMQ 同步訊息-批次發送結果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
}
/**
* 異步發送訊息,異步返回訊息結果
*
* @param message 訊息參數
*/
public <T> void asyncSend(RocketMQMessage<T> message) {
log.info("ROCKETMQ 異步訊息發送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
if (StringUtils.isNotBlank(message.getHashKey())) {
rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),
asyncSendCallback(message.getTaskId(), message.getTopic()));
} else {
rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),
asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());
}
}
/**
* 批次異步發送訊息
*
* @param message 訊息參數
*/
public <T> void asyncSendBatch(RocketMQMessage<T> message) {
log.info("ROCKETMQ 異步訊息-批次發送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
message.getTaskId(), message.getTopic(), message.getMessages().size());
if (StringUtils.isNotBlank(message.getHashKey())) {
rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),
asyncSendCallback(message.getTaskId(), message.getTopic()));
} else {
rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),
asyncSendCallback(message.getTaskId(), message.getTopic()));
}
}
/**
* 單向發送訊息,不關心返回結果,容易訊息遺失,適合日誌收集、不精確統計等訊息發送;
*
* @param message 訊息參數
*/
public <T> void sendOneWay(RocketMQMessage<T> message) {
sendOneWay(message, false);
}
/**
* 單向訊息 - 批次發送
*
* @param message 訊息體
* @param batch 是否為批次操作
*/
public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {
log.info((batch ? "ROCKETMQ 單向訊息發送 : [TaskId:{}] - [Topic:{}]"
: "ROCKETMQ 單向訊息-批次發送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),
message.getTaskId(), message.getTopic(), message.getMessages().size());
if (StringUtils.isNotBlank(message.getHashKey())) {
if (batch) {
message.getMessages().
forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));
} else {
rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
}
} else {
if (batch) {
message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));
} else {
rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());
}
}
}
}
定義RocketMQ消費者
import com.yiyan.study.constants.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* MQ訊息監聽
*/
@Component
@Slf4j
@RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,
consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
public class MQListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("MQListener 接收訊息 : {}", message);
}
}
定義測試類發送訊息
import cn.hutool.core.thread.ThreadUtil;
import com.yiyan.study.constants.MQConfig;
import com.yiyan.study.domain.RocketMQMessage;
import com.yiyan.study.utils.RocketMQService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
/**
* MQ測試
*/
@SpringBootTest
public class MQTest {
@Resource
private RocketMQService rocketMQService;
@Test
public void sendMessage() {
int count = 1;
while (count <= 50) {
rocketMQService.syncSend(RocketMQMessage.builder()
.topic(MQConfig.EVENT_TOPIC)
.message(count++)
.build());
}
// 休眠等待消費訊息
ThreadUtil.sleep(2000L);
}
}
4測試
<END>