消息队列(RocketMQ)
1. 概述
RocketMQ 是梵医云系统中使用的分布式消息队列中间件,具有高吞吐量、高可用、低延迟等特点。系统基于 rocketmq-spring-boot-starter 实现 RocketMQ 的集成。
RocketMQ 消息队列具有以下特点:
- 高吞吐量:单机可达十万级 TPS
- 高可用:支持主从架构,故障自动切换
- 低延迟:毫秒级消息延迟
- 消息可靠:支持消息持久化、消息重试、消息回溯
- 分布式支持:适用于分布式环境
- 多种消费模式:支持集群消费、广播消费
2. 技术架构
2.1 RocketMQ 架构
生产者 -> NameServer -> Broker -> 消费者组1 -> 消费者1
-> 消费者2
-> 消费者组2 -> 消费者3核心组件:
- NameServer:路由注册中心,负责管理 Broker 信息
- Broker:消息存储和转发,负责消息的存储和投递
- Producer:消息生产者,负责发送消息
- Consumer:消息消费者,负责消费消息
- Topic:消息主题,消息的分类
- Tag:消息标签,用于过滤消息
- Consumer Group:消费者组,同一组内的消费者分担消费任务
2.2 消息模型
2.2.1 集群消费(Clustering)
Topic -> Consumer Group -> Consumer1
-> Consumer2
-> Consumer3特点:
- 同一消费者组内的消费者分担消费任务
- 每条消息只会被组内一个消费者消费
- 适用于负载均衡场景
2.2.2 广播消费(Broadcasting)
Topic -> Consumer1
-> Consumer2
-> Consumer3特点:
- 每个消费者都会收到消息
- 适用于广播通知场景
2.3 核心组件
| 组件 | 说明 | 位置 |
|---|---|---|
| RocketMQTemplate | RocketMQ 操作模板类 | rocketmq-spring-boot-starter |
| @RocketMQMessageListener | RocketMQ 消息监听器注解 | rocketmq-spring-boot-starter |
| RocketMQListener | RocketMQ 监听器接口 | rocketmq-spring-boot-starter |
| TenantRocketMQSendMessageHook | 多租户消息发送钩子 | fanyi-spring-boot-starter-biz-tenant |
| TenantRocketMQConsumeMessageHook | 多租户消息消费钩子 | fanyi-spring-boot-starter-biz-tenant |
| TenantRocketMQInitializer | 多租户 RocketMQ 初始化器 | fanyi-spring-boot-starter-biz-tenant |
3. 配置说明
3.1 Maven 依赖
在 pom.xml 中添加依赖:
xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>3.2 RocketMQ 配置
在 application.yaml 中配置 RocketMQ:
yaml
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: fanyi-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
consumer:
group: fanyi-consumer-group
consume-thread-min: 5
consume-thread-max: 103.3 WebSocket RocketMQ 配置
在 application.yaml 中配置 WebSocket RocketMQ:
yaml
fanyi:
websocket:
sender-type: rocketmq
sender-rocketmq:
topic: websocket-topic
consumer-group: websocket-consumer-group3.4 配置参数说明
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| rocketmq.name-server | String | 是 | - | NameServer 地址,多个地址用分号分隔 |
| rocketmq.producer.group | String | 是 | - | 生产者组名 |
| rocketmq.producer.send-message-timeout | Integer | 否 | 3000 | 发送消息超时时间(毫秒) |
| rocketmq.producer.retry-times-when-send-failed | Integer | 否 | 2 | 发送失败重试次数 |
| rocketmq.consumer.group | String | 是 | - | 消费者组名 |
| rocketmq.consumer.consume-thread-min | Integer | 否 | 5 | 最小消费线程数 |
| rocketmq.consumer.consume-thread-max | Integer | 否 | 10 | 最大消费线程数 |
4. 使用方式
4.1 定义消息
定义消息实体类:
java
import lombok.Data;
@Data
public class DemoMessage {
private String messageId;
private String messageType;
private String content;
private Long userId;
private Long timestamp;
}4.2 发送消息
使用 RocketMQTemplate 发送消息:
java
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
@Service
public class DemoMessageProducer {
private final RocketMQTemplate rocketMQTemplate;
private final String topic = "demo-topic";
public DemoMessageProducer(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
public void sendDemoMessage(String content) {
DemoMessage message = new DemoMessage();
message.setMessageId(UUID.randomUUID().toString());
message.setMessageType("DEMO_MESSAGE");
message.setContent(content);
message.setUserId(1L);
message.setTimestamp(System.currentTimeMillis());
rocketMQTemplate.syncSend(topic, message);
}
}4.3 监听消息(集群消费)
使用 @RocketMQMessageListener 注解监听消息:
java
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "demo-topic",
consumerGroup = "demo-consumer-group",
messageModel = MessageModel.CLUSTERING
)
public class DemoMessageConsumer implements RocketMQListener<DemoMessage> {
@Override
public void onMessage(DemoMessage message) {
log.info("[onMessage][接收到消息,消息ID: {},消息类型: {},内容: {}]",
message.getMessageId(), message.getMessageType(), message.getContent());
}
}4.4 监听消息(广播消费)
java
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "demo-topic",
consumerGroup = "demo-consumer-group",
messageModel = MessageModel.BROADCASTING
)
public class DemoMessageConsumer implements RocketMQListener<DemoMessage> {
@Override
public void onMessage(DemoMessage message) {
log.info("[onMessage][接收到消息,消息ID: {},消息类型: {},内容: {}]",
message.getMessageId(), message.getMessageType(), message.getContent());
}
}4.5 带标签的消息
发送带标签的消息:
java
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
@Service
public class DemoMessageProducer {
private final RocketMQTemplate rocketMQTemplate;
private final String topic = "demo-topic";
private final String tag = "demo-tag";
public DemoMessageProducer(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
public void sendDemoMessage(String content) {
DemoMessage message = new DemoMessage();
message.setMessageId(UUID.randomUUID().toString());
message.setMessageType("DEMO_MESSAGE");
message.setContent(content);
rocketMQTemplate.syncSend(topic, tag, message);
}
}监听带标签的消息:
java
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "demo-topic",
selectorExpression = "demo-tag",
consumerGroup = "demo-consumer-group",
messageModel = MessageModel.CLUSTERING
)
public class DemoMessageConsumer implements RocketMQListener<DemoMessage> {
@Override
public void onMessage(DemoMessage message) {
log.info("[onMessage][接收到消息,消息ID: {},消息类型: {},内容: {}]",
message.getMessageId(), message.getMessageType(), message.getContent());
}
}5. 业务场景示例
5.1 WebSocket 消息广播
5.1.1 WebSocket 消息定义
java
import lombok.Data;
@Data
public class RocketMQWebSocketMessage {
private String sessionId;
private Integer userType;
private Long userId;
private String messageType;
private String messageContent;
}5.1.2 WebSocket 消息发送者
java
import com.fanyi.cloud.framework.websocket.core.sender.AbstractWebSocketMessageSender;
import com.fanyi.cloud.framework.websocket.core.sender.WebSocketMessageSender;
import com.fanyi.cloud.framework.websocket.core.session.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
@Slf4j
public class RocketMQWebSocketMessageSender extends AbstractWebSocketMessageSender {
private final RocketMQTemplate rocketMQTemplate;
private final String topic;
public RocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager,
RocketMQTemplate rocketMQTemplate,
String topic) {
super(sessionManager);
this.rocketMQTemplate = rocketMQTemplate;
this.topic = topic;
}
@Override
public void send(Integer userType, Long userId, String messageType, String messageContent) {
sendRocketMQMessage(null, userId, userType, messageType, messageContent);
}
@Override
public void send(Integer userType, String messageType, String messageContent) {
sendRocketMQMessage(null, null, userType, messageType, messageContent);
}
@Override
public void send(String sessionId, String messageType, String messageContent) {
sendRocketMQMessage(sessionId, null, null, messageType, messageContent);
}
private void sendRocketMQMessage(String sessionId, Long userId, Integer userType,
String messageType, String messageContent) {
RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage()
.setSessionId(sessionId)
.setUserId(userId)
.setUserType(userType)
.setMessageType(messageType)
.setMessageContent(messageContent);
rocketMQTemplate.syncSend(topic, mqMessage);
}
}5.1.3 WebSocket 消息消费者
java
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
@RocketMQMessageListener(
topic = "${fanyi.websocket.sender-rocketmq.topic}",
consumerGroup = "${fanyi.websocket.sender-rocketmq.consumer-group}",
messageModel = MessageModel.BROADCASTING
)
@RequiredArgsConstructor
public class RocketMQWebSocketMessageConsumer implements RocketMQListener<RocketMQWebSocketMessage> {
private final RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender;
@Override
public void onMessage(RocketMQWebSocketMessage message) {
rocketMQWebSocketMessageSender.send(message.getSessionId(),
message.getUserType(), message.getUserId(),
message.getMessageType(), message.getMessageContent());
}
}5.2 订单状态变更通知
5.2.1 订单状态消息定义
java
import lombok.Data;
@Data
public class OrderStatusChangeMessage {
private Long orderId;
private String orderNo;
private Integer oldStatus;
private Integer newStatus;
private Long userId;
private Long timestamp;
}5.2.2 订单状态消息发送
java
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderService {
private final RocketMQTemplate rocketMQTemplate;
private final String topic = "order-status-change-topic";
public OrderService(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
@Transactional
public void updateOrderStatus(Long orderId, Integer newStatus) {
OrderDO order = orderMapper.selectById(orderId);
Integer oldStatus = order.getStatus();
order.setStatus(newStatus);
orderMapper.updateById(order);
OrderStatusChangeMessage message = new OrderStatusChangeMessage();
message.setOrderId(orderId);
message.setOrderNo(order.getOrderNo());
message.setOldStatus(oldStatus);
message.setNewStatus(newStatus);
message.setUserId(order.getUserId());
message.setTimestamp(System.currentTimeMillis());
rocketMQTemplate.syncSend(topic, message);
}
}5.2.3 订单状态消息监听
java
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "order-status-change-topic",
consumerGroup = "order-status-consumer-group",
messageModel = MessageModel.CLUSTERING
)
public class OrderStatusChangeListener implements RocketMQListener<OrderStatusChangeMessage> {
@Override
public void onMessage(OrderStatusChangeMessage message) {
log.info("[onMessage][订单状态变更,订单号: {},旧状态: {},新状态: {}]",
message.getOrderNo(), message.getOldStatus(), message.getNewStatus());
switch (message.getNewStatus()) {
case 10:
handleOrderPaid(message);
break;
case 20:
handleOrderShipped(message);
break;
case 30:
handleOrderCompleted(message);
break;
}
}
private void handleOrderPaid(OrderStatusChangeMessage message) {
log.info("[handleOrderPaid][订单已支付,订单号: {}]", message.getOrderNo());
}
private void handleOrderShipped(OrderStatusChangeMessage message) {
log.info("[handleOrderShipped][订单已发货,订单号: {}]", message.getOrderNo());
}
private void handleOrderCompleted(OrderStatusChangeMessage message) {
log.info("[handleOrderCompleted][订单已完成,订单号: {}]", message.getOrderNo());
}
}6. 多租户支持
6.1 消息发送钩子
实现 SendMessageHook 接口,在发送消息时添加租户信息:
java
import com.fanyi.cloud.framework.tenant.core.context.TenantContextHolder;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
public class TenantRocketMQSendMessageHook implements SendMessageHook {
@Override
public String hookName() {
return getClass().getSimpleName();
}
@Override
public void sendMessageBefore(SendMessageContext sendMessageContext) {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId == null) {
return;
}
sendMessageContext.getMessage().putUserProperty("tenant-id", tenantId.toString());
}
@Override
public void sendMessageAfter(SendMessageContext sendMessageContext) {
}
}6.2 消息消费钩子
实现 ConsumeMessageHook 接口,在消费消息时设置租户信息:
java
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.fanyi.cloud.framework.tenant.core.context.TenantContextHolder;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
public class TenantRocketMQConsumeMessageHook implements ConsumeMessageHook {
@Override
public String hookName() {
return getClass().getSimpleName();
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
List<MessageExt> messages = context.getMsgList();
Assert.isTrue(messages.size() == 1, "消息条数({})不正确", messages.size());
String tenantId = messages.get(0).getUserProperty("tenant-id");
if (StrUtil.isNotEmpty(tenantId)) {
TenantContextHolder.setTenantId(Long.parseLong(tenantId));
}
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
TenantContextHolder.clear();
}
}6.3 多租户初始化器
实现 BeanPostProcessor 接口,在 RocketMQ 初始化时注册多租户钩子:
java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
public class TenantRocketMQInitializer implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DefaultRocketMQListenerContainer) {
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
initTenantConsumer(container.getConsumer());
} else if (bean instanceof RocketMQTemplate) {
RocketMQTemplate template = (RocketMQTemplate) bean;
initTenantProducer(template.getProducer());
}
return bean;
}
private void initTenantProducer(DefaultMQProducer producer) {
if (producer == null) {
return;
}
DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl();
if (producerImpl == null) {
return;
}
producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook());
}
private void initTenantConsumer(DefaultMQPushConsumer consumer) {
if (consumer == null) {
return;
}
DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl();
if (consumerImpl == null) {
return;
}
consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook());
}
}7. 最佳实践
7.1 消息命名规范
- 消息类名以
Message结尾 - 消息类名应该能够清晰描述消息的含义
- 示例:
OrderStatusChangeMessage、RocketMQWebSocketMessage、DemoMessage
7.2 Topic 命名规范
- Topic 使用小写字母和连字符
- Topic 应该能够清晰描述消息的用途
- 示例:
order-status-change-topic、websocket-topic、demo-topic
7.3 消息数据设计
java
import lombok.Data;
@Data
public class OrderStatusChangeMessage {
private Long orderId;
private String orderNo;
private Integer oldStatus;
private Integer newStatus;
private Long userId;
private Long timestamp;
public OrderStatusChangeMessage setOrder(OrderDO order, Integer newStatus) {
this.orderId = order.getId();
this.orderNo = order.getOrderNo();
this.oldStatus = order.getStatus();
this.newStatus = newStatus;
this.userId = order.getUserId();
this.timestamp = System.currentTimeMillis();
return this;
}
}7.4 消息发送
java
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class OrderMessageProducer {
private final RocketMQTemplate rocketMQTemplate;
private final String topic = "order-status-change-topic";
public OrderMessageProducer(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
public void sendOrderStatusChange(OrderDO order, Integer newStatus) {
try {
OrderStatusChangeMessage message = new OrderStatusChangeMessage()
.setOrder(order, newStatus);
rocketMQTemplate.syncSend(topic, message);
log.info("[sendOrderStatusChange][订单状态变更消息发送成功,订单号: {},新状态: {}]",
order.getOrderNo(), newStatus);
} catch (Exception ex) {
log.error("[sendOrderStatusChange][订单状态变更消息发送失败,订单号: {}]",
order.getOrderNo(), ex);
}
}
}7.5 消息监听
java
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "order-status-change-topic",
consumerGroup = "order-status-consumer-group",
messageModel = MessageModel.CLUSTERING
)
public class OrderStatusChangeListener implements RocketMQListener<OrderStatusChangeMessage> {
@Override
public void onMessage(OrderStatusChangeMessage message) {
log.info("[onMessage][接收到订单状态变更消息,订单号: {},新状态: {}]",
message.getOrderNo(), message.getNewStatus());
try {
processOrderStatusChange(message);
log.info("[onMessage][订单状态变更消息处理完成,订单号: {}]",
message.getOrderNo());
} catch (Exception ex) {
log.error("[onMessage][订单状态变更消息处理失败,订单号: {}]",
message.getOrderNo(), ex);
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
}
}7.6 消息幂等性
java
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "order-status-change-topic",
consumerGroup = "order-status-consumer-group",
messageModel = MessageModel.CLUSTERING
)
public class OrderStatusChangeListener implements RocketMQListener<OrderStatusChangeMessage> {
@Override
public void onMessage(OrderStatusChangeMessage message) {
String lockKey = "order:status:change:" + message.getOrderId();
RLock lock = redissonClient.getLock(lockKey);
try {
if (lock.tryLock()) {
processOrderStatusChange(message);
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
OrderDO order = orderMapper.selectById(message.getOrderId());
if (order.getStatus().equals(message.getNewStatus())) {
log.info("[processOrderStatusChange][订单状态已更新,跳过处理,订单号: {}]",
message.getOrderNo());
return;
}
}
}7.7 错误处理
java
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "order-status-change-topic",
consumerGroup = "order-status-consumer-group",
messageModel = MessageModel.CLUSTERING
)
public class OrderStatusChangeListener implements RocketMQListener<OrderStatusChangeMessage> {
@Override
public void onMessage(OrderStatusChangeMessage message) {
try {
processOrderStatusChange(message);
} catch (BusinessException ex) {
log.warn("[onMessage][业务异常,订单号: {},错误: {}]",
message.getOrderNo(), ex.getMessage());
} catch (Exception ex) {
log.error("[onMessage][系统异常,订单号: {}]",
message.getOrderNo(), ex);
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
}
}7.8 日志记录
java
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "order-status-change-topic",
consumerGroup = "order-status-consumer-group",
messageModel = MessageModel.CLUSTERING
)
public class OrderStatusChangeListener implements RocketMQListener<OrderStatusChangeMessage> {
@Override
public void onMessage(OrderStatusChangeMessage message) {
long startTime = System.currentTimeMillis();
log.info("[onMessage][开始处理订单状态变更消息,订单号: {},新状态: {}]",
message.getOrderNo(), message.getNewStatus());
try {
processOrderStatusChange(message);
long endTime = System.currentTimeMillis();
log.info("[onMessage][订单状态变更消息处理完成,订单号: {},耗时: {}ms]",
message.getOrderNo(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[onMessage][订单状态变更消息处理失败,订单号: {},耗时: {}ms]",
message.getOrderNo(), endTime - startTime, ex);
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
}
}8. 常见问题
8.1 消息不消费
问题:消息发送后监听器不消费
解决方案:
- 检查监听器类是否添加了
@Component注解 - 检查监听器是否在 Spring 容器的扫描路径下
- 检查 RocketMQ 连接是否正常
- 检查 Topic 和 Consumer Group 配置是否正确
- 检查消息模式(集群/广播)是否匹配
8.2 消息重复消费
问题:消息被重复消费
解决方案:
- 使用分布式锁防止重复消费
- 实现消息幂等性
- 检查消费者组配置
- 检查消息重试配置
8.3 消息丢失
问题:消息发送后丢失
解决方案:
- 检查 Broker 存储配置
- 检查消息发送是否成功
- 检查消费者组配置
- 检查消息重试配置
8.4 消息消费失败
问题:消息消费失败后不重试
解决方案:
- 检查消息重试配置
- 检查错误日志
- 检查消费者组配置
- 检查消息模式配置
8.5 性能问题
问题:消息消费速度慢
解决方案:
- 增加消费者数量
- 优化消费逻辑,减少处理时间
- 增加消费线程数
- 检查 Broker 性能
9. 监控和调试
9.1 消息发送监控
java
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MonitoredMessageProducer {
private final RocketMQTemplate rocketMQTemplate;
private final String topic = "demo-topic";
public MonitoredMessageProducer(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
public void sendMessage(DemoMessage message) {
long startTime = System.currentTimeMillis();
log.info("[sendMessage][开始发送消息,Topic: {},消息ID: {}]", topic, message.getMessageId());
try {
rocketMQTemplate.syncSend(topic, message);
long endTime = System.currentTimeMillis();
log.info("[sendMessage][消息发送完成,Topic: {},消息ID: {},耗时: {}ms]",
topic, message.getMessageId(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[sendMessage][消息发送失败,Topic: {},消息ID: {},耗时: {}ms]",
topic, message.getMessageId(), endTime - startTime, ex);
}
}
}9.2 消息消费监控
java
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "demo-topic",
consumerGroup = "demo-consumer-group",
messageModel = MessageModel.CLUSTERING
)
public class MonitoredMessageConsumer implements RocketMQListener<DemoMessage> {
@Override
public void onMessage(DemoMessage message) {
long startTime = System.currentTimeMillis();
log.info("[onMessage][开始处理消息,Topic: {},消息ID: {}]",
"demo-topic", message.getMessageId());
try {
processMessage(message);
long endTime = System.currentTimeMillis();
log.info("[onMessage][消息处理完成,Topic: {},消息ID: {},耗时: {}ms]",
"demo-topic", message.getMessageId(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[onMessage][消息处理失败,Topic: {},消息ID: {},耗时: {}ms]",
"demo-topic", message.getMessageId(), endTime - startTime, ex);
}
}
private void processMessage(DemoMessage message) {
}
}10. 注意事项
- NameServer 配置:确保 NameServer 地址配置正确
- Topic 命名:使用小写字母和连字符
- Consumer Group:同一消费者组内的消费者分担消费任务
- 消息模式:根据业务场景选择集群消费或广播消费
- 消息幂等性:确保消息可以安全地重复消费
- 分布式锁:在分布式环境中使用分布式锁防止重复消费
- 错误处理:正确处理消息消费过程中的异常
- 日志记录:记录消息的发送和消费情况,便于问题排查
- 性能优化:对于大数据量的处理,增加消费者数量和消费线程数
- 监控告警:配置消息发送和消费的监控告警
- 多租户支持:使用多租户钩子实现租户信息的传递
- 消息重试:合理配置消息重试次数和重试间隔
11. 与其他消息队列的对比
| 特性 | RocketMQ | Redis 消息队列 | 内存消息队列 | RabbitMQ | Kafka |
|---|---|---|---|---|---|
| 适用场景 | 分布式 | 单机/集群 | 单机应用 | 分布式 | 分布式 |
| 执行方式 | 异步 | 异步 | 同步 | 异步 | 异步 |
| 复杂度 | 高 | 中 | 低 | 高 | 高 |
| 性能 | 高 | 高 | 高 | 中 | 高 |
| 可靠性 | 高 | 中 | 低 | 高 | 高 |
| 持久化 | 支持 | Stream 支持 | 不支持 | 支持 | 支持 |
| 消息确认 | 支持 | Stream 支持 | 不支持 | 支持 | 支持 |
| 消息重试 | 支持 | 支持 | 不支持 | 支持 | 支持 |
| 广播消费 | 支持 | Pub/Sub 支持 | 支持 | 支持 | 支持 |
| 集群消费 | 支持 | Stream 支持 | 不支持 | 支持 | 支持 |
| 消息回溯 | 支持 | 支持 | 不支持 | 支持 | 支持 |
| 吞吐量 | 十万级 TPS | 万级 TPS | 万级 TPS | 万级 TPS | 十万级 TPS |
