消息队列
1. 概述
RabbitMQ 是梵医云系统中使用的分布式消息队列中间件,基于 AMQP 协议实现,具有高可靠性、高可用性、灵活的路由等特点。系统基于 spring-boot-starter-amqp 实现 RabbitMQ 的集成。
RabbitMQ 消息队列具有以下特点:
- 高可靠性:支持消息持久化、消息确认、消息重试
- 高可用性:支持集群部署,故障自动切换
- 灵活路由:支持多种交换机类型和路由键
- 分布式支持:适用于分布式环境
- 多种消费模式:支持直接交换机、主题交换机、扇出交换机等
2. 技术架构
2.1 RabbitMQ 架构
生产者 -> 交换机 -> 队列 -> 消费者1
-> 队列 -> 消费者2
-> 队列 -> 消费者3核心组件:
- Producer:消息生产者,负责发送消息
- Exchange:交换机,负责接收消息并路由到队列
- Queue:队列,负责存储消息
- Consumer:消息消费者,负责消费消息
- Binding:绑定,交换机和队列之间的绑定关系
- Routing Key:路由键,用于消息路由
2.2 交换机类型
2.2.1 直接交换机(Direct Exchange)
生产者 -> Direct Exchange -> 队列1 (routing key: "order.paid")
-> 队列2 (routing key: "order.shipped")特点:
- 根据路由键精确匹配队列
- 适用于点对点消息传递
2.2.2 主题交换机(Topic Exchange)
生产者 -> Topic Exchange -> 队列1 (routing key: "order.*")
-> 队列2 (routing key: "*.paid")
-> 队列3 (routing key: "order.paid")特点:
- 支持通配符匹配(* 和 #)
- 适用于多主题消息传递
2.2.3 扇出交换机(Fanout Exchange)
生产者 -> Fanout Exchange -> 队列1
-> 队列2
-> 队列3特点:
- 忽略路由键,将消息发送到所有绑定的队列
- 适用于广播消息
2.3 核心组件
| 组件 | 说明 | 位置 |
|---|---|---|
| RabbitTemplate | RabbitMQ 操作模板类 | spring-boot-starter-amqp |
| @RabbitListener | RabbitMQ 消息监听器注解 | spring-boot-starter-amqp |
| @RabbitHandler | RabbitMQ 消息处理器注解 | spring-boot-starter-amqp |
| CommonMqMessage | 通用消息类 | fanyi-spring-boot-starter-mq |
| RabbitmqService | RabbitMQ 服务类 | fanyi-spring-boot-starter-mq |
| TenantRabbitMQMessagePostProcessor | 多租户消息后处理器 | fanyi-spring-boot-starter-biz-tenant |
| TenantRabbitMQInitializer | 多租户 RabbitMQ 初始化器 | fanyi-spring-boot-starter-biz-tenant |
3. 配置说明
3.1 Maven 依赖
在 pom.xml 中添加依赖:
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>3.2 RabbitMQ 配置
在 application.yaml 中配置 RabbitMQ:
yaml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
prefetch: 1
concurrency: 5
max-concurrency: 103.3 WebSocket RabbitMQ 配置
在 application.yaml 中配置 WebSocket RabbitMQ:
yaml
fanyi:
websocket:
sender-type: rabbitmq
sender-rabbitmq:
exchange: websocket-exchange
queue: websocket-queue3.4 配置参数说明
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| spring.rabbitmq.host | String | 是 | - | RabbitMQ 服务器地址 |
| spring.rabbitmq.port | Integer | 否 | 5672 | RabbitMQ 服务器端口 |
| spring.rabbitmq.username | String | 是 | - | RabbitMQ 用户名 |
| spring.rabbitmq.password | String | 是 | - | RabbitMQ 密码 |
| spring.rabbitmq.virtual-host | String | 否 | / | RabbitMQ 虚拟主机 |
| spring.rabbitmq.publisher-confirm-type | String | 否 | - | 发布确认类型 |
| spring.rabbitmq.publisher-returns | Boolean | 否 | - | 发布返回 |
| spring.rabbitmq.listener.simple.acknowledge-mode | String | 否 | auto | 确认模式(manual/auto/none) |
| spring.rabbitmq.listener.simple.prefetch | Integer | 否 | 1 | 预取消息数 |
| spring.rabbitmq.listener.simple.concurrency | Integer | 否 | 5 | 最小并发数 |
| spring.rabbitmq.listener.simple.max-concurrency | Integer | 否 | 10 | 最大并发数 |
4. 使用方式
4.1 定义消息
定义消息实体类:
java
import lombok.Data;
@Data
public class DemoMessage {
private Long id;
private String messageType;
private String content;
private Long userId;
private Long timestamp;
}4.2 发送消息
使用 RabbitTemplate 发送消息:
java
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Service
public class DemoMessageProducer {
private final RabbitTemplate rabbitTemplate;
private final TopicExchange topicExchange;
public DemoMessageProducer(RabbitTemplate rabbitTemplate, TopicExchange topicExchange) {
this.rabbitTemplate = rabbitTemplate;
this.topicExchange = topicExchange;
}
public void sendDemoMessage(String content) {
DemoMessage message = new DemoMessage();
message.setMessageType("DEMO_MESSAGE");
message.setContent(content);
message.setUserId(1L);
message.setTimestamp(System.currentTimeMillis());
rabbitTemplate.convertAndSend(topicExchange.getName(), null, message);
}
}4.3 监听消息
使用 @RabbitListener 注解监听消息:
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "demo-queue",
durable = "true"
),
exchange = @Exchange(
name = "demo-exchange",
type = ExchangeTypes.TOPIC,
durable = "true"
),
key = "demo-routing-key"
)
)
public class DemoMessageConsumer {
@RabbitHandler
public void onMessage(DemoMessage message) {
log.info("[onMessage][接收到消息,消息类型: {},内容: {}]",
message.getMessageType(), message.getContent());
}
}4.4 广播消息
使用 Fanout Exchange 实现广播:
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
// 在 Queue 的名字上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Queue 不同,以达到广播消费的目的
name = "demo-queue" + "-" + "#{T(java.util.UUID).randomUUID()}",
// Consumer 关闭时,该队列就可以被自动删除了
autoDelete = "true"
),
exchange = @Exchange(
name = "demo-exchange",
type = ExchangeTypes.FANOUT,
durable = "false"
)
)
)
public class DemoMessageConsumer {
@RabbitHandler
public void onMessage(DemoMessage message) {
log.info("[onMessage][接收到消息,消息类型: {},内容: {}]",
message.getMessageType(), message.getContent());
}
}5. 业务场景示例
5.1 WebSocket 消息广播
5.1.1 WebSocket 消息定义
java
import lombok.Data;
import java.io.Serializable;
@Data
public class RabbitMQWebSocketMessage implements Serializable {
private String sessionId;
private Integer userType;
private Long userId;
private String messageType;
private String messageContent;
}5.1.2 WebSocket 消息发送者
java
import com.fanyi.cloud.framework.websocket.core.sender.AbstractWebSocketMessageSender;
import com.fanyi.cloud.framework.websocket.core.sender.WebSocketMessageSender;
import com.fanyi.cloud.framework.websocket.core.session.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@Slf4j
public class RabbitMQWebSocketMessageSender extends AbstractWebSocketMessageSender {
private final RabbitTemplate rabbitTemplate;
private final TopicExchange topicExchange;
public RabbitMQWebSocketMessageSender(WebSocketSessionManager sessionManager,
RabbitTemplate rabbitTemplate,
TopicExchange topicExchange) {
super(sessionManager);
this.rabbitTemplate = rabbitTemplate;
this.topicExchange = topicExchange;
}
@Override
public void send(Integer userType, Long userId, String messageType, String messageContent) {
sendRabbitMQMessage(null, userId, userType, messageType, messageContent);
}
@Override
public void send(Integer userType, String messageType, String messageContent) {
sendRabbitMQMessage(null, null, userType, messageType, messageContent);
}
@Override
public void send(String sessionId, String messageType, String messageContent) {
sendRabbitMQMessage(sessionId, null, null, messageType, messageContent);
}
private void sendRabbitMQMessage(String sessionId, Long userId, Integer userType,
String messageType, String messageContent) {
RabbitMQWebSocketMessage mqMessage = new RabbitMQWebSocketMessage()
.setSessionId(sessionId)
.setUserId(userId)
.setUserType(userType)
.setMessageType(messageType)
.setMessageContent(messageContent);
rabbitTemplate.convertAndSend(topicExchange.getName(), null, mqMessage);
}
}5.1.3 WebSocket 消息消费者
java
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
// 在 Queue 的名字上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Queue 不同,以达到广播消费的目的
name = "${fanyi.websocket.sender-rabbitmq.queue}" + "-" + "#{T(java.util.UUID).randomUUID()}",
// Consumer 关闭时,该队列就可以被自动删除了
autoDelete = "true"
),
exchange = @Exchange(
name = "${fanyi.websocket.sender-rabbitmq.exchange}",
type = ExchangeTypes.TOPIC,
declare = "false"
)
)
)
@RequiredArgsConstructor
public class RabbitMQWebSocketMessageConsumer {
private final RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender;
@RabbitHandler
public void onMessage(RabbitMQWebSocketMessage message) {
rabbitMQWebSocketMessageSender.send(message.getSessionId(),
message.getUserType(), message.getUserId(),
message.getMessageType(), message.getMessageContent());
}
}5.2 订单状态变更通知
5.2.1 订单状态消息定义
java
import lombok.Data;
@Data
public class OrderStatusChangeMessage {
private Long orderId;
private String orderNo;
private Integer oldStatus;
private Integer newStatus;
private Long userId;
private Long timestamp;
}5.2.2 订单状态消息发送
java
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderService {
private final RabbitTemplate rabbitTemplate;
private final TopicExchange topicExchange;
public OrderService(RabbitTemplate rabbitTemplate, TopicExchange topicExchange) {
this.rabbitTemplate = rabbitTemplate;
this.topicExchange = topicExchange;
}
@Transactional
public void updateOrderStatus(Long orderId, Integer newStatus) {
OrderDO order = orderMapper.selectById(orderId);
Integer oldStatus = order.getStatus();
order.setStatus(newStatus);
orderMapper.updateById(order);
OrderStatusChangeMessage message = new OrderStatusChangeMessage();
message.setOrderId(orderId);
message.setOrderNo(order.getOrderNo());
message.setOldStatus(oldStatus);
message.setNewStatus(newStatus);
message.setUserId(order.getUserId());
message.setTimestamp(System.currentTimeMillis());
rabbitTemplate.convertAndSend(topicExchange.getName(), null, message);
}
}5.2.3 订单状态消息监听
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "order-status-change-queue",
durable = "true"
),
exchange = @Exchange(
name = "order-status-change-exchange",
type = ExchangeTypes.TOPIC,
durable = "true"
),
key = "order.status.change"
)
)
public class OrderStatusChangeListener {
@RabbitHandler
public void onMessage(OrderStatusChangeMessage message) {
log.info("[onMessage][订单状态变更,订单号: {},旧状态: {},新状态: {}]",
message.getOrderNo(), message.getOldStatus(), message.getNewStatus());
switch (message.getNewStatus()) {
case 10:
handleOrderPaid(message);
break;
case 20:
handleOrderShipped(message);
break;
case 30:
handleOrderCompleted(message);
break;
}
}
private void handleOrderPaid(OrderStatusChangeMessage message) {
log.info("[handleOrderPaid][订单已支付,订单号: {}]", message.getOrderNo());
}
private void handleOrderShipped(OrderStatusChangeMessage message) {
log.info("[handleOrderShipped][订单已发货,订单号: {}]", message.getOrderNo());
}
private void handleOrderCompleted(OrderStatusChangeMessage message) {
log.info("[handleOrderCompleted][订单已完成,订单号: {}]", message.getOrderNo());
}
}6. 多租户支持
6.1 消息发送后处理器
实现 MessagePostProcessor 接口,在发送消息时添加租户信息:
java
import com.fanyi.cloud.framework.tenant.core.context.TenantContextHolder;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
public class TenantRabbitMQMessagePostProcessor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId != null) {
message.getMessageProperties().getHeaders().put("tenant-id", tenantId);
}
return message;
}
}6.2 多租户初始化器
实现 BeanPostProcessor 接口,在 RabbitMQ 初始化时注册多租户后处理器:
java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
public class TenantRabbitMQInitializer implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RabbitTemplate) {
RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;
rabbitTemplate.addBeforePublishPostProcessors(new TenantRabbitMQMessagePostProcessor());
}
return bean;
}
}7. 最佳实践
7.1 消息命名规范
- 消息类名以
Message结尾 - 消息类名应该能够清晰描述消息的含义
- 示例:
OrderStatusChangeMessage、RabbitMQWebSocketMessage、DemoMessage
7.2 Exchange 命名规范
- Exchange 使用小写字母和连字符
- Exchange 应该能够清晰描述消息的用途
- 示例:
order-status-change-exchange、websocket-exchange、demo-exchange
7.3 Queue 命名规范
- Queue 使用小写字母和连字符
- Queue 应该能够清晰描述消息的用途
- 示例:
order-status-change-queue、websocket-queue、demo-queue
7.4 消息数据设计
java
import lombok.Data;
@Data
public class OrderStatusChangeMessage {
private Long orderId;
private String orderNo;
private Integer oldStatus;
private Integer newStatus;
private Long userId;
private Long timestamp;
public OrderStatusChangeMessage setOrder(OrderDO order, Integer newStatus) {
this.orderId = order.getId();
this.orderNo = order.getOrderNo();
this.oldStatus = order.getStatus();
this.newStatus = newStatus;
this.userId = order.getUserId();
this.timestamp = System.currentTimeMillis();
return this;
}
}7.5 消息发送
java
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class OrderMessageProducer {
private final RabbitTemplate rabbitTemplate;
private final TopicExchange topicExchange;
public OrderMessageProducer(RabbitTemplate rabbitTemplate, TopicExchange topicExchange) {
this.rabbitTemplate = rabbitTemplate;
this.topicExchange = topicExchange;
}
public void sendOrderStatusChange(OrderDO order, Integer newStatus) {
try {
OrderStatusChangeMessage message = new OrderStatusChangeMessage()
.setOrder(order, newStatus);
rabbitTemplate.convertAndSend(topicExchange.getName(), null, message);
log.info("[sendOrderStatusChange][订单状态变更消息发送成功,订单号: {},新状态: {}]",
order.getOrderNo(), newStatus);
} catch (Exception ex) {
log.error("[sendOrderStatusChange][订单状态变更消息发送失败,订单号: {}]",
order.getOrderNo(), ex);
}
}
}7.6 消息监听
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "order-status-change-queue",
durable = "true"
),
exchange = @Exchange(
name = "order-status-change-exchange",
type = ExchangeTypes.TOPIC,
durable = "true"
),
key = "order.status.change"
)
)
public class OrderStatusChangeListener {
@RabbitHandler
public void onMessage(OrderStatusChangeMessage message) {
log.info("[onMessage][接收到订单状态变更消息,订单号: {},新状态: {}]",
message.getOrderNo(), message.getNewStatus());
try {
processOrderStatusChange(message);
log.info("[onMessage][订单状态变更消息处理完成,订单号: {}]",
message.getOrderNo());
} catch (Exception ex) {
log.error("[onMessage][订单状态变更消息处理失败,订单号: {}]",
message.getOrderNo(), ex);
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
}
}7.7 消息幂等性
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "order-status-change-queue",
durable = "true"
),
exchange = @Exchange(
name = "order-status-change-exchange",
type = ExchangeTypes.TOPIC,
durable = "true"
),
key = "order.status.change"
)
)
public class OrderStatusChangeListener {
@RabbitHandler
public void onMessage(OrderStatusChangeMessage message) {
String lockKey = "order:status:change:" + message.getOrderId();
RLock lock = redissonClient.getLock(lockKey);
try {
if (lock.tryLock()) {
processOrderStatusChange(message);
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
OrderDO order = orderMapper.selectById(message.getOrderId());
if (order.getStatus().equals(message.getNewStatus())) {
log.info("[processOrderStatusChange][订单状态已更新,跳过处理,订单号: {}]",
message.getOrderNo());
return;
}
}
}7.8 错误处理
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "order-status-change-queue",
durable = "true"
),
exchange = @Exchange(
name = "order-status-change-exchange",
type = ExchangeTypes.TOPIC,
durable = "true"
),
key = "order.status.change"
)
)
public class OrderStatusChangeListener {
@RabbitHandler
public void onMessage(OrderStatusChangeMessage message) {
try {
processOrderStatusChange(message);
} catch (BusinessException ex) {
log.warn("[onMessage][业务异常,订单号: {},错误: {}]",
message.getOrderNo(), ex.getMessage());
} catch (Exception ex) {
log.error("[onMessage][系统异常,订单号: {}]",
message.getOrderNo(), ex);
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
}
}7.9 日志记录
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "order-status-change-queue",
durable = "true"
),
exchange = @Exchange(
name = "order-status-change-exchange",
type = ExchangeTypes.TOPIC,
durable = "true"
),
key = "order.status.change"
)
)
public class OrderStatusChangeListener {
@RabbitHandler
public void onMessage(OrderStatusChangeMessage message) {
long startTime = System.currentTimeMillis();
log.info("[onMessage][开始处理订单状态变更消息,订单号: {},新状态: {}]",
message.getOrderNo(), message.getNewStatus());
try {
processOrderStatusChange(message);
long endTime = System.currentTimeMillis();
log.info("[onMessage][订单状态变更消息处理完成,订单号: {},耗时: {}ms]",
message.getOrderNo(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[onMessage][订单状态变更消息处理失败,订单号: {},耗时: {}ms]",
message.getOrderNo(), endTime - startTime, ex);
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
}
}8. 常见问题
8.1 消息不消费
问题:消息发送后监听器不消费
解决方案:
- 检查监听器类是否添加了
@Component注解 - 检查监听器是否在 Spring 容器的扫描路径下
- 检查 RabbitMQ 连接是否正常
- 检查 Exchange 和 Queue 配置是否正确
- 检查 Routing Key 是否匹配
8.2 消息重复消费
问题:消息被重复消费
解决方案:
- 使用分布式锁防止重复消费
- 实现消息幂等性
- 检查消息确认配置
- 检查消费者配置
8.3 消息丢失
问题:消息发送后丢失
解决方案:
- 检查消息持久化配置
- 检查消息发送是否成功
- 检查 Exchange 和 Queue 配置
- 检查消息确认配置
8.4 消息消费失败
问题:消息消费失败后不重试
解决方案:
- 检查消息重试配置
- 检查错误日志
- 检查消费者配置
- 检查消息确认配置
8.5 性能问题
问题:消息消费速度慢
解决方案:
- 增加消费者数量
- 优化消费逻辑,减少处理时间
- 增加并发数
- 检查 RabbitMQ 性能
9. 监控和调试
9.1 消息发送监控
java
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MonitoredMessageProducer {
private final RabbitTemplate rabbitTemplate;
private final TopicExchange topicExchange;
public MonitoredMessageProducer(RabbitTemplate rabbitTemplate, TopicExchange topicExchange) {
this.rabbitTemplate = rabbitTemplate;
this.topicExchange = topicExchange;
}
public void sendMessage(DemoMessage message) {
long startTime = System.currentTimeMillis();
log.info("[sendMessage][开始发送消息,Exchange: {},消息类型: {}]",
topicExchange.getName(), message.getMessageType());
try {
rabbitTemplate.convertAndSend(topicExchange.getName(), null, message);
long endTime = System.currentTimeMillis();
log.info("[sendMessage][消息发送完成,Exchange: {},消息类型: {},耗时: {}ms]",
topicExchange.getName(), message.getMessageType(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[sendMessage][消息发送失败,Exchange: {},消息类型: {},耗时: {}ms]",
topicExchange.getName(), message.getMessageType(), endTime - startTime, ex);
}
}
}9.2 消息消费监控
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
name = "demo-queue",
durable = "true"
),
exchange = @Exchange(
name = "demo-exchange",
type = ExchangeTypes.TOPIC,
durable = "true"
),
key = "demo-routing-key"
)
)
public class MonitoredMessageConsumer {
@RabbitHandler
public void onMessage(DemoMessage message) {
long startTime = System.currentTimeMillis();
log.info("[onMessage][开始处理消息,Exchange: {},消息类型: {}]",
"demo-exchange", message.getMessageType());
try {
processMessage(message);
long endTime = System.currentTimeMillis();
log.info("[onMessage][消息处理完成,Exchange: {},消息类型: {},耗时: {}ms]",
"demo-exchange", message.getMessageType(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[onMessage][消息处理失败,Exchange: {},消息类型: {},耗时: {}ms]",
"demo-exchange", message.getMessageType(), endTime - startTime, ex);
}
}
private void processMessage(DemoMessage message) {
}
}10. 注意事项
- RabbitMQ 配置:确保 RabbitMQ 地址、端口、用户名、密码配置正确
- Exchange 命名:使用小写字母和连字符
- Queue 命名:使用小写字母和连字符
- Routing Key:根据 Exchange 类型选择合适的 Routing Key
- 消息持久化:确保 Exchange 和 Queue 设置为持久化
- 消息确认:根据业务场景选择合适的确认模式
- 消息幂等性:确保消息可以安全地重复消费
- 分布式锁:在分布式环境中使用分布式锁防止重复消费
- 错误处理:正确处理消息消费过程中的异常
- 日志记录:记录消息的发送和消费情况,便于问题排查
- 性能优化:对于大数据量的处理,增加消费者数量和并发数
- 监控告警:配置消息发送和消费的监控告警
- 多租户支持:使用消息后处理器实现租户信息的传递
- 消息重试:合理配置消息重试次数和重试间隔
11. 与其他消息队列的对比
| 特性 | RabbitMQ | Redis 消息队列 | 内存消息队列 | RocketMQ | Kafka |
|---|---|---|---|---|---|
| 适用场景 | 分布式 | 单机/集群 | 单机应用 | 分布式 | 分布式 |
| 执行方式 | 异步 | 异步 | 同步 | 异步 | 异步 |
| 复杂度 | 中 | 中 | 低 | 高 | 高 |
| 性能 | 中 | 高 | 高 | 高 | 高 |
| 可靠性 | 高 | 中 | 低 | 高 | 高 |
| 持久化 | 支持 | Stream 支持 | 不支持 | 支持 | 支持 |
| 消息确认 | 支持 | Stream 支持 | 不支持 | 支持 | 支持 |
| 消息重试 | 支持 | 支持 | 不支持 | 支持 | 支持 |
| 广播消费 | 支持 | Pub/Sub 支持 | 支持 | 支持 | 支持 |
| 集群消费 | 支持 | Stream 支持 | 不支持 | 支持 | 支持 |
| 消息回溯 | 支持 | 支持 | 不支持 | 支持 | 支持 |
| 路由灵活 | 高 | 低 | 低 | 中 | 低 |
| 吞吐量 | 万级 TPS | 万级 TPS | 万级 TPS | 十万级 TPS | 十万级 TPS |
