Skip to content

消息队列

1. 概述

RabbitMQ 是梵医云系统中使用的分布式消息队列中间件,基于 AMQP 协议实现,具有高可靠性、高可用性、灵活的路由等特点。系统基于 spring-boot-starter-amqp 实现 RabbitMQ 的集成。

RabbitMQ 消息队列具有以下特点:

  • 高可靠性:支持消息持久化、消息确认、消息重试
  • 高可用性:支持集群部署,故障自动切换
  • 灵活路由:支持多种交换机类型和路由键
  • 分布式支持:适用于分布式环境
  • 多种消费模式:支持直接交换机、主题交换机、扇出交换机等

2. 技术架构

2.1 RabbitMQ 架构

生产者 -> 交换机 -> 队列 -> 消费者1
                    -> 队列 -> 消费者2
                    -> 队列 -> 消费者3

核心组件

  • Producer:消息生产者,负责发送消息
  • Exchange:交换机,负责接收消息并路由到队列
  • Queue:队列,负责存储消息
  • Consumer:消息消费者,负责消费消息
  • Binding:绑定,交换机和队列之间的绑定关系
  • Routing Key:路由键,用于消息路由

2.2 交换机类型

2.2.1 直接交换机(Direct Exchange)

生产者 -> Direct Exchange -> 队列1 (routing key: "order.paid")
                         -> 队列2 (routing key: "order.shipped")

特点:

  • 根据路由键精确匹配队列
  • 适用于点对点消息传递

2.2.2 主题交换机(Topic Exchange)

生产者 -> Topic Exchange -> 队列1 (routing key: "order.*")
                       -> 队列2 (routing key: "*.paid")
                       -> 队列3 (routing key: "order.paid")

特点:

  • 支持通配符匹配(* 和 #)
  • 适用于多主题消息传递

2.2.3 扇出交换机(Fanout Exchange)

生产者 -> Fanout Exchange -> 队列1
                       -> 队列2
                       -> 队列3

特点:

  • 忽略路由键,将消息发送到所有绑定的队列
  • 适用于广播消息

2.3 核心组件

组件说明位置
RabbitTemplateRabbitMQ 操作模板类spring-boot-starter-amqp
@RabbitListenerRabbitMQ 消息监听器注解spring-boot-starter-amqp
@RabbitHandlerRabbitMQ 消息处理器注解spring-boot-starter-amqp
CommonMqMessage通用消息类fanyi-spring-boot-starter-mq
RabbitmqServiceRabbitMQ 服务类fanyi-spring-boot-starter-mq
TenantRabbitMQMessagePostProcessor多租户消息后处理器fanyi-spring-boot-starter-biz-tenant
TenantRabbitMQInitializer多租户 RabbitMQ 初始化器fanyi-spring-boot-starter-biz-tenant

3. 配置说明

3.1 Maven 依赖

pom.xml 中添加依赖:

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2 RabbitMQ 配置

application.yaml 中配置 RabbitMQ:

yaml
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1
        concurrency: 5
        max-concurrency: 10

3.3 WebSocket RabbitMQ 配置

application.yaml 中配置 WebSocket RabbitMQ:

yaml
fanyi:
  websocket:
    sender-type: rabbitmq
    sender-rabbitmq:
      exchange: websocket-exchange
      queue: websocket-queue

3.4 配置参数说明

参数类型必填默认值说明
spring.rabbitmq.hostString-RabbitMQ 服务器地址
spring.rabbitmq.portInteger5672RabbitMQ 服务器端口
spring.rabbitmq.usernameString-RabbitMQ 用户名
spring.rabbitmq.passwordString-RabbitMQ 密码
spring.rabbitmq.virtual-hostString/RabbitMQ 虚拟主机
spring.rabbitmq.publisher-confirm-typeString-发布确认类型
spring.rabbitmq.publisher-returnsBoolean-发布返回
spring.rabbitmq.listener.simple.acknowledge-modeStringauto确认模式(manual/auto/none)
spring.rabbitmq.listener.simple.prefetchInteger1预取消息数
spring.rabbitmq.listener.simple.concurrencyInteger5最小并发数
spring.rabbitmq.listener.simple.max-concurrencyInteger10最大并发数

4. 使用方式

4.1 定义消息

定义消息实体类:

java
import lombok.Data;

@Data
public class DemoMessage {

    private Long id;
    private String messageType;
    private String content;
    private Long userId;
    private Long timestamp;
}

4.2 发送消息

使用 RabbitTemplate 发送消息:

java
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Service
public class DemoMessageProducer {

    private final RabbitTemplate rabbitTemplate;
    private final TopicExchange topicExchange;

    public DemoMessageProducer(RabbitTemplate rabbitTemplate, TopicExchange topicExchange) {
        this.rabbitTemplate = rabbitTemplate;
        this.topicExchange = topicExchange;
    }

    public void sendDemoMessage(String content) {
        DemoMessage message = new DemoMessage();
        message.setMessageType("DEMO_MESSAGE");
        message.setContent(content);
        message.setUserId(1L);
        message.setTimestamp(System.currentTimeMillis());
        rabbitTemplate.convertAndSend(topicExchange.getName(), null, message);
    }
}

4.3 监听消息

使用 @RabbitListener 注解监听消息:

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        name = "demo-queue",
                        durable = "true"
                ),
                exchange = @Exchange(
                        name = "demo-exchange",
                        type = ExchangeTypes.TOPIC,
                        durable = "true"
                ),
                key = "demo-routing-key"
        )
)
public class DemoMessageConsumer {

    @RabbitHandler
    public void onMessage(DemoMessage message) {
        log.info("[onMessage][接收到消息,消息类型: {},内容: {}]",
                message.getMessageType(), message.getContent());
    }
}

4.4 广播消息

使用 Fanout Exchange 实现广播:

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        // 在 Queue 的名字上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Queue 不同,以达到广播消费的目的
                        name = "demo-queue" + "-" + "#{T(java.util.UUID).randomUUID()}",
                        // Consumer 关闭时,该队列就可以被自动删除了
                        autoDelete = "true"
                ),
                exchange = @Exchange(
                        name = "demo-exchange",
                        type = ExchangeTypes.FANOUT,
                        durable = "false"
                )
        )
)
public class DemoMessageConsumer {

    @RabbitHandler
    public void onMessage(DemoMessage message) {
        log.info("[onMessage][接收到消息,消息类型: {},内容: {}]",
                message.getMessageType(), message.getContent());
    }
}

5. 业务场景示例

5.1 WebSocket 消息广播

5.1.1 WebSocket 消息定义

java
import lombok.Data;

import java.io.Serializable;

@Data
public class RabbitMQWebSocketMessage implements Serializable {

    private String sessionId;
    private Integer userType;
    private Long userId;
    private String messageType;
    private String messageContent;
}

5.1.2 WebSocket 消息发送者

java
import com.fanyi.cloud.framework.websocket.core.sender.AbstractWebSocketMessageSender;
import com.fanyi.cloud.framework.websocket.core.sender.WebSocketMessageSender;
import com.fanyi.cloud.framework.websocket.core.session.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

@Slf4j
public class RabbitMQWebSocketMessageSender extends AbstractWebSocketMessageSender {

    private final RabbitTemplate rabbitTemplate;
    private final TopicExchange topicExchange;

    public RabbitMQWebSocketMessageSender(WebSocketSessionManager sessionManager,
                                          RabbitTemplate rabbitTemplate,
                                          TopicExchange topicExchange) {
        super(sessionManager);
        this.rabbitTemplate = rabbitTemplate;
        this.topicExchange = topicExchange;
    }

    @Override
    public void send(Integer userType, Long userId, String messageType, String messageContent) {
        sendRabbitMQMessage(null, userId, userType, messageType, messageContent);
    }

    @Override
    public void send(Integer userType, String messageType, String messageContent) {
        sendRabbitMQMessage(null, null, userType, messageType, messageContent);
    }

    @Override
    public void send(String sessionId, String messageType, String messageContent) {
        sendRabbitMQMessage(sessionId, null, null, messageType, messageContent);
    }

    private void sendRabbitMQMessage(String sessionId, Long userId, Integer userType,
                                     String messageType, String messageContent) {
        RabbitMQWebSocketMessage mqMessage = new RabbitMQWebSocketMessage()
                .setSessionId(sessionId)
                .setUserId(userId)
                .setUserType(userType)
                .setMessageType(messageType)
                .setMessageContent(messageContent);
        rabbitTemplate.convertAndSend(topicExchange.getName(), null, mqMessage);
    }
}

5.1.3 WebSocket 消息消费者

java
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;

@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        // 在 Queue 的名字上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Queue 不同,以达到广播消费的目的
                        name = "${fanyi.websocket.sender-rabbitmq.queue}" + "-" + "#{T(java.util.UUID).randomUUID()}",
                        // Consumer 关闭时,该队列就可以被自动删除了
                        autoDelete = "true"
                ),
                exchange = @Exchange(
                        name = "${fanyi.websocket.sender-rabbitmq.exchange}",
                        type = ExchangeTypes.TOPIC,
                        declare = "false"
                )
        )
)
@RequiredArgsConstructor
public class RabbitMQWebSocketMessageConsumer {

    private final RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender;

    @RabbitHandler
    public void onMessage(RabbitMQWebSocketMessage message) {
        rabbitMQWebSocketMessageSender.send(message.getSessionId(),
                message.getUserType(), message.getUserId(),
                message.getMessageType(), message.getMessageContent());
    }
}

5.2 订单状态变更通知

5.2.1 订单状态消息定义

java
import lombok.Data;

@Data
public class OrderStatusChangeMessage {

    private Long orderId;
    private String orderNo;
    private Integer oldStatus;
    private Integer newStatus;
    private Long userId;
    private Long timestamp;
}

5.2.2 订单状态消息发送

java
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderService {

    private final RabbitTemplate rabbitTemplate;
    private final TopicExchange topicExchange;

    public OrderService(RabbitTemplate rabbitTemplate, TopicExchange topicExchange) {
        this.rabbitTemplate = rabbitTemplate;
        this.topicExchange = topicExchange;
    }

    @Transactional
    public void updateOrderStatus(Long orderId, Integer newStatus) {
        OrderDO order = orderMapper.selectById(orderId);
        Integer oldStatus = order.getStatus();
        order.setStatus(newStatus);
        orderMapper.updateById(order);

        OrderStatusChangeMessage message = new OrderStatusChangeMessage();
        message.setOrderId(orderId);
        message.setOrderNo(order.getOrderNo());
        message.setOldStatus(oldStatus);
        message.setNewStatus(newStatus);
        message.setUserId(order.getUserId());
        message.setTimestamp(System.currentTimeMillis());
        rabbitTemplate.convertAndSend(topicExchange.getName(), null, message);
    }
}

5.2.3 订单状态消息监听

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        name = "order-status-change-queue",
                        durable = "true"
                ),
                exchange = @Exchange(
                        name = "order-status-change-exchange",
                        type = ExchangeTypes.TOPIC,
                        durable = "true"
                ),
                key = "order.status.change"
        )
)
public class OrderStatusChangeListener {

    @RabbitHandler
    public void onMessage(OrderStatusChangeMessage message) {
        log.info("[onMessage][订单状态变更,订单号: {},旧状态: {},新状态: {}]",
                message.getOrderNo(), message.getOldStatus(), message.getNewStatus());
        
        switch (message.getNewStatus()) {
            case 10:
                handleOrderPaid(message);
                break;
            case 20:
                handleOrderShipped(message);
                break;
            case 30:
                handleOrderCompleted(message);
                break;
        }
    }

    private void handleOrderPaid(OrderStatusChangeMessage message) {
        log.info("[handleOrderPaid][订单已支付,订单号: {}]", message.getOrderNo());
    }

    private void handleOrderShipped(OrderStatusChangeMessage message) {
        log.info("[handleOrderShipped][订单已发货,订单号: {}]", message.getOrderNo());
    }

    private void handleOrderCompleted(OrderStatusChangeMessage message) {
        log.info("[handleOrderCompleted][订单已完成,订单号: {}]", message.getOrderNo());
    }
}

6. 多租户支持

6.1 消息发送后处理器

实现 MessagePostProcessor 接口,在发送消息时添加租户信息:

java
import com.fanyi.cloud.framework.tenant.core.context.TenantContextHolder;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;

public class TenantRabbitMQMessagePostProcessor implements MessagePostProcessor {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        Long tenantId = TenantContextHolder.getTenantId();
        if (tenantId != null) {
            message.getMessageProperties().getHeaders().put("tenant-id", tenantId);
        }
        return message;
    }
}

6.2 多租户初始化器

实现 BeanPostProcessor 接口,在 RabbitMQ 初始化时注册多租户后处理器:

java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;

public class TenantRabbitMQInitializer implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof RabbitTemplate) {
            RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;
            rabbitTemplate.addBeforePublishPostProcessors(new TenantRabbitMQMessagePostProcessor());
        }
        return bean;
    }
}

7. 最佳实践

7.1 消息命名规范

  • 消息类名以 Message 结尾
  • 消息类名应该能够清晰描述消息的含义
  • 示例:OrderStatusChangeMessageRabbitMQWebSocketMessageDemoMessage

7.2 Exchange 命名规范

  • Exchange 使用小写字母和连字符
  • Exchange 应该能够清晰描述消息的用途
  • 示例:order-status-change-exchangewebsocket-exchangedemo-exchange

7.3 Queue 命名规范

  • Queue 使用小写字母和连字符
  • Queue 应该能够清晰描述消息的用途
  • 示例:order-status-change-queuewebsocket-queuedemo-queue

7.4 消息数据设计

java
import lombok.Data;

@Data
public class OrderStatusChangeMessage {

    private Long orderId;
    private String orderNo;
    private Integer oldStatus;
    private Integer newStatus;
    private Long userId;
    private Long timestamp;

    public OrderStatusChangeMessage setOrder(OrderDO order, Integer newStatus) {
        this.orderId = order.getId();
        this.orderNo = order.getOrderNo();
        this.oldStatus = order.getStatus();
        this.newStatus = newStatus;
        this.userId = order.getUserId();
        this.timestamp = System.currentTimeMillis();
        return this;
    }
}

7.5 消息发送

java
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class OrderMessageProducer {

    private final RabbitTemplate rabbitTemplate;
    private final TopicExchange topicExchange;

    public OrderMessageProducer(RabbitTemplate rabbitTemplate, TopicExchange topicExchange) {
        this.rabbitTemplate = rabbitTemplate;
        this.topicExchange = topicExchange;
    }

    public void sendOrderStatusChange(OrderDO order, Integer newStatus) {
        try {
            OrderStatusChangeMessage message = new OrderStatusChangeMessage()
                    .setOrder(order, newStatus);
            rabbitTemplate.convertAndSend(topicExchange.getName(), null, message);
            log.info("[sendOrderStatusChange][订单状态变更消息发送成功,订单号: {},新状态: {}]",
                    order.getOrderNo(), newStatus);
        } catch (Exception ex) {
            log.error("[sendOrderStatusChange][订单状态变更消息发送失败,订单号: {}]",
                    order.getOrderNo(), ex);
        }
    }
}

7.6 消息监听

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        name = "order-status-change-queue",
                        durable = "true"
                ),
                exchange = @Exchange(
                        name = "order-status-change-exchange",
                        type = ExchangeTypes.TOPIC,
                        durable = "true"
                ),
                key = "order.status.change"
        )
)
public class OrderStatusChangeListener {

    @RabbitHandler
    public void onMessage(OrderStatusChangeMessage message) {
        log.info("[onMessage][接收到订单状态变更消息,订单号: {},新状态: {}]",
                message.getOrderNo(), message.getNewStatus());
        try {
            processOrderStatusChange(message);
            log.info("[onMessage][订单状态变更消息处理完成,订单号: {}]",
                    message.getOrderNo());
        } catch (Exception ex) {
            log.error("[onMessage][订单状态变更消息处理失败,订单号: {}]",
                    message.getOrderNo(), ex);
        }
    }

    private void processOrderStatusChange(OrderStatusChangeMessage message) {
    }
}

7.7 消息幂等性

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        name = "order-status-change-queue",
                        durable = "true"
                ),
                exchange = @Exchange(
                        name = "order-status-change-exchange",
                        type = ExchangeTypes.TOPIC,
                        durable = "true"
                ),
                key = "order.status.change"
        )
)
public class OrderStatusChangeListener {

    @RabbitHandler
    public void onMessage(OrderStatusChangeMessage message) {
        String lockKey = "order:status:change:" + message.getOrderId();
        RLock lock = redissonClient.getLock(lockKey);
        try {
            if (lock.tryLock()) {
                processOrderStatusChange(message);
            }
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

    private void processOrderStatusChange(OrderStatusChangeMessage message) {
        OrderDO order = orderMapper.selectById(message.getOrderId());
        if (order.getStatus().equals(message.getNewStatus())) {
            log.info("[processOrderStatusChange][订单状态已更新,跳过处理,订单号: {}]",
                    message.getOrderNo());
            return;
        }
    }
}

7.8 错误处理

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        name = "order-status-change-queue",
                        durable = "true"
                ),
                exchange = @Exchange(
                        name = "order-status-change-exchange",
                        type = ExchangeTypes.TOPIC,
                        durable = "true"
                ),
                key = "order.status.change"
        )
)
public class OrderStatusChangeListener {

    @RabbitHandler
    public void onMessage(OrderStatusChangeMessage message) {
        try {
            processOrderStatusChange(message);
        } catch (BusinessException ex) {
            log.warn("[onMessage][业务异常,订单号: {},错误: {}]",
                    message.getOrderNo(), ex.getMessage());
        } catch (Exception ex) {
            log.error("[onMessage][系统异常,订单号: {}]",
                    message.getOrderNo(), ex);
        }
    }

    private void processOrderStatusChange(OrderStatusChangeMessage message) {
    }
}

7.9 日志记录

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        name = "order-status-change-queue",
                        durable = "true"
                ),
                exchange = @Exchange(
                        name = "order-status-change-exchange",
                        type = ExchangeTypes.TOPIC,
                        durable = "true"
                ),
                key = "order.status.change"
        )
)
public class OrderStatusChangeListener {

    @RabbitHandler
    public void onMessage(OrderStatusChangeMessage message) {
        long startTime = System.currentTimeMillis();
        log.info("[onMessage][开始处理订单状态变更消息,订单号: {},新状态: {}]",
                message.getOrderNo(), message.getNewStatus());
        try {
            processOrderStatusChange(message);
            long endTime = System.currentTimeMillis();
            log.info("[onMessage][订单状态变更消息处理完成,订单号: {},耗时: {}ms]",
                    message.getOrderNo(), endTime - startTime);
        } catch (Exception ex) {
            long endTime = System.currentTimeMillis();
            log.error("[onMessage][订单状态变更消息处理失败,订单号: {},耗时: {}ms]",
                    message.getOrderNo(), endTime - startTime, ex);
        }
    }

    private void processOrderStatusChange(OrderStatusChangeMessage message) {
    }
}

8. 常见问题

8.1 消息不消费

问题:消息发送后监听器不消费

解决方案

  1. 检查监听器类是否添加了 @Component 注解
  2. 检查监听器是否在 Spring 容器的扫描路径下
  3. 检查 RabbitMQ 连接是否正常
  4. 检查 Exchange 和 Queue 配置是否正确
  5. 检查 Routing Key 是否匹配

8.2 消息重复消费

问题:消息被重复消费

解决方案

  1. 使用分布式锁防止重复消费
  2. 实现消息幂等性
  3. 检查消息确认配置
  4. 检查消费者配置

8.3 消息丢失

问题:消息发送后丢失

解决方案

  1. 检查消息持久化配置
  2. 检查消息发送是否成功
  3. 检查 Exchange 和 Queue 配置
  4. 检查消息确认配置

8.4 消息消费失败

问题:消息消费失败后不重试

解决方案

  1. 检查消息重试配置
  2. 检查错误日志
  3. 检查消费者配置
  4. 检查消息确认配置

8.5 性能问题

问题:消息消费速度慢

解决方案

  1. 增加消费者数量
  2. 优化消费逻辑,减少处理时间
  3. 增加并发数
  4. 检查 RabbitMQ 性能

9. 监控和调试

9.1 消息发送监控

java
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class MonitoredMessageProducer {

    private final RabbitTemplate rabbitTemplate;
    private final TopicExchange topicExchange;

    public MonitoredMessageProducer(RabbitTemplate rabbitTemplate, TopicExchange topicExchange) {
        this.rabbitTemplate = rabbitTemplate;
        this.topicExchange = topicExchange;
    }

    public void sendMessage(DemoMessage message) {
        long startTime = System.currentTimeMillis();
        log.info("[sendMessage][开始发送消息,Exchange: {},消息类型: {}]",
                topicExchange.getName(), message.getMessageType());
        try {
            rabbitTemplate.convertAndSend(topicExchange.getName(), null, message);
            long endTime = System.currentTimeMillis();
            log.info("[sendMessage][消息发送完成,Exchange: {},消息类型: {},耗时: {}ms]",
                    topicExchange.getName(), message.getMessageType(), endTime - startTime);
        } catch (Exception ex) {
            long endTime = System.currentTimeMillis();
            log.error("[sendMessage][消息发送失败,Exchange: {},消息类型: {},耗时: {}ms]",
                    topicExchange.getName(), message.getMessageType(), endTime - startTime, ex);
        }
    }
}

9.2 消息消费监控

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        name = "demo-queue",
                        durable = "true"
                ),
                exchange = @Exchange(
                        name = "demo-exchange",
                        type = ExchangeTypes.TOPIC,
                        durable = "true"
                ),
                key = "demo-routing-key"
        )
)
public class MonitoredMessageConsumer {

    @RabbitHandler
    public void onMessage(DemoMessage message) {
        long startTime = System.currentTimeMillis();
        log.info("[onMessage][开始处理消息,Exchange: {},消息类型: {}]",
                "demo-exchange", message.getMessageType());
        try {
            processMessage(message);
            long endTime = System.currentTimeMillis();
            log.info("[onMessage][消息处理完成,Exchange: {},消息类型: {},耗时: {}ms]",
                    "demo-exchange", message.getMessageType(), endTime - startTime);
        } catch (Exception ex) {
            long endTime = System.currentTimeMillis();
            log.error("[onMessage][消息处理失败,Exchange: {},消息类型: {},耗时: {}ms]",
                    "demo-exchange", message.getMessageType(), endTime - startTime, ex);
        }
    }

    private void processMessage(DemoMessage message) {
    }
}

10. 注意事项

  1. RabbitMQ 配置:确保 RabbitMQ 地址、端口、用户名、密码配置正确
  2. Exchange 命名:使用小写字母和连字符
  3. Queue 命名:使用小写字母和连字符
  4. Routing Key:根据 Exchange 类型选择合适的 Routing Key
  5. 消息持久化:确保 Exchange 和 Queue 设置为持久化
  6. 消息确认:根据业务场景选择合适的确认模式
  7. 消息幂等性:确保消息可以安全地重复消费
  8. 分布式锁:在分布式环境中使用分布式锁防止重复消费
  9. 错误处理:正确处理消息消费过程中的异常
  10. 日志记录:记录消息的发送和消费情况,便于问题排查
  11. 性能优化:对于大数据量的处理,增加消费者数量和并发数
  12. 监控告警:配置消息发送和消费的监控告警
  13. 多租户支持:使用消息后处理器实现租户信息的传递
  14. 消息重试:合理配置消息重试次数和重试间隔

11. 与其他消息队列的对比

特性RabbitMQRedis 消息队列内存消息队列RocketMQKafka
适用场景分布式单机/集群单机应用分布式分布式
执行方式异步异步同步异步异步
复杂度
性能
可靠性
持久化支持Stream 支持不支持支持支持
消息确认支持Stream 支持不支持支持支持
消息重试支持支持不支持支持支持
广播消费支持Pub/Sub 支持支持支持支持
集群消费支持Stream 支持不支持支持支持
消息回溯支持支持不支持支持支持
路由灵活
吞吐量万级 TPS万级 TPS万级 TPS十万级 TPS十万级 TPS

12. 相关文档