Skip to content

消息队列(Redis)

1. 概述

Redis 消息队列是梵医云系统中基于 Redis 实现的消息队列,支持两种消费模式:

  1. Redis Pub/Sub(发布订阅):用于广播消费,所有订阅者都能收到消息
  2. Redis Stream:用于集群消费,支持消息确认、消费组等高级特性

Redis 消息队列具有以下特点:

  • 高性能:基于 Redis 内存数据库,读写速度快
  • 支持集群:适用于分布式环境
  • 多种模式:支持广播消费和集群消费
  • 消息持久化:Stream 模式支持消息持久化
  • 消息确认:Stream 模式支持消息确认机制

2. 技术架构

2.1 Redis Pub/Sub 架构

生产者 -> Redis Channel -> 订阅者1
                      -> 订阅者2
                      -> 订阅者3

特点

  • 广播模式,所有订阅者都能收到消息
  • 消息不持久化,未订阅时消息会丢失
  • 适用于实时通知场景

2.2 Redis Stream 架构

生产者 -> Redis Stream -> 消费者组1 -> 消费者1
                      -> 消费者2
                      -> 消费者组2 -> 消费者3

特点

  • 集群消费,支持消费者组
  • 消息持久化,支持消息回溯
  • 支持消息确认机制
  • 适用于可靠消息传递场景

2.3 核心组件

组件说明位置
RedisMQTemplateRedis MQ 操作模板类fanyi-spring-boot-starter-mq
AbstractRedisChannelMessageRedis Channel Message 抽象类fanyi-spring-boot-starter-mq
AbstractRedisStreamMessageRedis Stream Message 抽象类fanyi-spring-boot-starter-mq
AbstractRedisChannelMessageListenerRedis Pub/Sub 监听器抽象类fanyi-spring-boot-starter-mq
AbstractRedisStreamMessageListenerRedis Stream 监听器抽象类fanyi-spring-boot-starter-mq
RedisMessageInterceptorRedis 消息拦截器fanyi-spring-boot-starter-mq
RedisPendingMessageResendJobRedis 消息重新投递任务fanyi-spring-boot-starter-mq
FanyiRedisMQProducerAutoConfigurationRedis 消息队列 Producer 配置类fanyi-spring-boot-starter-mq
FanyiRedisMQConsumerAutoConfigurationRedis 消息队列 Consumer 配置类fanyi-spring-boot-starter-mq

3. 配置说明

3.1 Redis 配置

application.yaml 中配置 Redis:

yaml
spring:
  data:
    redis:
      host: 127.0.0.1
      port: 6379
      database: 0
      password: your-password
      timeout: 3000
      lettuce:
        pool:
          max-active: 8
          max-idle: 8
          min-idle: 0
          max-wait: -1ms

3.2 Redis 版本要求

  • Redis Pub/Sub:Redis 2.0+
  • Redis Stream:Redis 5.0+

系统会自动检查 Redis 版本,如果版本低于 5.0.0,会抛出异常。

4. 使用方式

4.1 Redis Pub/Sub 消息队列

4.1.1 定义 Channel 消息

继承 AbstractRedisChannelMessage 定义消息:

java
import com.fanyi.cloud.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import lombok.Data;

@Data
public class DemoChannelMessage extends AbstractRedisChannelMessage {

    private String messageId;
    private String messageType;
    private String content;
    private Long timestamp;
}

4.1.2 发送 Channel 消息

使用 RedisMQTemplate 发送消息:

java
import com.fanyi.cloud.framework.mq.redis.core.RedisMQTemplate;
import org.springframework.stereotype.Service;

@Service
public class DemoMessageProducer {

    private final RedisMQTemplate redisMQTemplate;

    public DemoMessageProducer(RedisMQTemplate redisMQTemplate) {
        this.redisMQTemplate = redisMQTemplate;
    }

    public void sendDemoMessage(String content) {
        DemoChannelMessage message = new DemoChannelMessage();
        message.setMessageId(UUID.randomUUID().toString());
        message.setMessageType("DEMO_MESSAGE");
        message.setContent(content);
        message.setTimestamp(System.currentTimeMillis());
        redisMQTemplate.send(message);
    }
}

4.1.3 监听 Channel 消息

继承 AbstractRedisChannelMessageListener 监听消息:

java
import com.fanyi.cloud.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DemoChannelMessageListener extends AbstractRedisChannelMessageListener<DemoChannelMessage> {

    @Override
    public void onMessage(DemoChannelMessage message) {
        log.info("[onMessage][接收到消息,消息ID: {},消息类型: {},内容: {}]",
                message.getMessageId(), message.getMessageType(), message.getContent());
    }
}

4.2 Redis Stream 消息队列

4.2.1 定义 Stream 消息

继承 AbstractRedisStreamMessage 定义消息:

java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.Data;

@Data
public class DemoStreamMessage extends AbstractRedisStreamMessage {

    private String messageId;
    private String messageType;
    private String content;
    private Long timestamp;
}

4.2.2 发送 Stream 消息

使用 RedisMQTemplate 发送消息:

java
import com.fanyi.cloud.framework.mq.redis.core.RedisMQTemplate;
import org.springframework.stereotype.Service;

@Service
public class DemoMessageProducer {

    private final RedisMQTemplate redisMQTemplate;

    public DemoMessageProducer(RedisMQTemplate redisMQTemplate) {
        this.redisMQTemplate = redisMQTemplate;
    }

    public void sendDemoMessage(String content) {
        DemoStreamMessage message = new DemoStreamMessage();
        message.setMessageId(UUID.randomUUID().toString());
        message.setMessageType("DEMO_MESSAGE");
        message.setContent(content);
        message.setTimestamp(System.currentTimeMillis());
        redisMQTemplate.send(message);
    }
}

4.2.3 监听 Stream 消息

继承 AbstractRedisStreamMessageListener 监听消息:

java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DemoStreamMessageListener extends AbstractRedisStreamMessageListener<DemoStreamMessage> {

    @Override
    public void onMessage(DemoStreamMessage message) {
        log.info("[onMessage][接收到消息,消息ID: {},消息类型: {},内容: {}]",
                message.getMessageId(), message.getMessageType(), message.getContent());
    }
}

4.3 消息拦截器

实现 RedisMessageInterceptor 接口添加拦截器:

java
import com.fanyi.cloud.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import com.fanyi.cloud.framework.mq.redis.core.message.AbstractRedisMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DemoRedisMessageInterceptor implements RedisMessageInterceptor {

    @Override
    public void sendMessageBefore(AbstractRedisMessage message) {
        log.info("[sendMessageBefore][发送消息前,消息类型: {}]", message.getClass().getSimpleName());
    }

    @Override
    public void sendMessageAfter(AbstractRedisMessage message) {
        log.info("[sendMessageAfter][发送消息后,消息类型: {}]", message.getClass().getSimpleName());
    }

    @Override
    public void consumeMessageBefore(AbstractRedisMessage message) {
        log.info("[consumeMessageBefore][消费消息前,消息类型: {}]", message.getClass().getSimpleName());
    }

    @Override
    public void consumeMessageAfter(AbstractRedisMessage message) {
        log.info("[consumeMessageAfter][消费消息后,消息类型: {}]", message.getClass().getSimpleName());
    }
}

4.4 消息头信息

使用消息头传递额外信息:

java
import com.fanyi.cloud.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import lombok.Data;

@Data
public class DemoChannelMessage extends AbstractRedisChannelMessage {

    private String messageId;
    private String messageType;
    private String content;

    public DemoChannelMessage addTenantId(Long tenantId) {
        addHeader("tenantId", String.valueOf(tenantId));
        return this;
    }

    public Long getTenantId() {
        String tenantId = getHeader("tenantId");
        return tenantId != null ? Long.parseLong(tenantId) : null;
    }
}

5. 业务场景示例

5.1 WebSocket 消息广播

5.1.1 WebSocket 消息定义

java
import com.fanyi.cloud.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import lombok.Data;

@Data
public class RedisWebSocketMessage extends AbstractRedisChannelMessage {

    private String sessionId;
    private Integer userType;
    private Long userId;
    private String messageType;
    private String messageContent;
}

5.1.2 WebSocket 消息发送者

java
import com.fanyi.cloud.framework.mq.redis.core.RedisMQTemplate;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RedisWebSocketMessageSender extends AbstractWebSocketMessageSender {

    private final RedisMQTemplate redisMQTemplate;

    public RedisWebSocketMessageSender(WebSocketSessionManager sessionManager,
                                       RedisMQTemplate redisMQTemplate) {
        super(sessionManager);
        this.redisMQTemplate = redisMQTemplate;
    }

    @Override
    public void send(Integer userType, Long userId, String messageType, String messageContent) {
        sendRedisMessage(null, userId, userType, messageType, messageContent);
    }

    @Override
    public void send(Integer userType, String messageType, String messageContent) {
        sendRedisMessage(null, null, userType, messageType, messageContent);
    }

    @Override
    public void send(String sessionId, String messageType, String messageContent) {
        sendRedisMessage(sessionId, null, null, messageType, messageContent);
    }

    private void sendRedisMessage(String sessionId, Long userId, Integer userType,
                                  String messageType, String messageContent) {
        RedisWebSocketMessage mqMessage = new RedisWebSocketMessage()
                .setSessionId(sessionId)
                .setUserId(userId)
                .setUserType(userType)
                .setMessageType(messageType)
                .setMessageContent(messageContent);
        redisMQTemplate.send(mqMessage);
    }
}

5.1.3 WebSocket 消息消费者

java
import com.fanyi.cloud.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class RedisWebSocketMessageConsumer extends AbstractRedisChannelMessageListener<RedisWebSocketMessage> {

    private final RedisWebSocketMessageSender redisWebSocketMessageSender;

    @Override
    public void onMessage(RedisWebSocketMessage message) {
        redisWebSocketMessageSender.send(message.getSessionId(),
                message.getUserType(), message.getUserId(),
                message.getMessageType(), message.getMessageContent());
    }
}

5.2 订单状态变更通知

5.2.1 订单状态消息定义

java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.Data;

@Data
public class OrderStatusChangeMessage extends AbstractRedisStreamMessage {

    private Long orderId;
    private String orderNo;
    private Integer oldStatus;
    private Integer newStatus;
    private Long userId;
    private Long timestamp;
}

5.2.2 订单状态消息发送

java
import com.fanyi.cloud.framework.mq.redis.core.RedisMQTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderService {

    private final RedisMQTemplate redisMQTemplate;

    public OrderService(RedisMQTemplate redisMQTemplate) {
        this.redisMQTemplate = redisMQTemplate;
    }

    @Transactional
    public void updateOrderStatus(Long orderId, Integer newStatus) {
        OrderDO order = orderMapper.selectById(orderId);
        Integer oldStatus = order.getStatus();
        order.setStatus(newStatus);
        orderMapper.updateById(order);

        OrderStatusChangeMessage message = new OrderStatusChangeMessage();
        message.setOrderId(orderId);
        message.setOrderNo(order.getOrderNo());
        message.setOldStatus(oldStatus);
        message.setNewStatus(newStatus);
        message.setUserId(order.getUserId());
        message.setTimestamp(System.currentTimeMillis());
        redisMQTemplate.send(message);
    }
}

5.2.3 订单状态消息监听

java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderStatusChangeListener extends AbstractRedisStreamMessageListener<OrderStatusChangeMessage> {

    @Override
    public void onMessage(OrderStatusChangeMessage message) {
        log.info("[onMessage][订单状态变更,订单号: {},旧状态: {},新状态: {}]",
                message.getOrderNo(), message.getOldStatus(), message.getNewStatus());
        
        switch (message.getNewStatus()) {
            case 10:
                handleOrderPaid(message);
                break;
            case 20:
                handleOrderShipped(message);
                break;
            case 30:
                handleOrderCompleted(message);
                break;
        }
    }

    private void handleOrderPaid(OrderStatusChangeMessage message) {
        log.info("[handleOrderPaid][订单已支付,订单号: {}]", message.getOrderNo());
    }

    private void handleOrderShipped(OrderStatusChangeMessage message) {
        log.info("[handleOrderShipped][订单已发货,订单号: {}]", message.getOrderNo());
    }

    private void handleOrderCompleted(OrderStatusChangeMessage message) {
        log.info("[handleOrderCompleted][订单已完成,订单号: {}]", message.getOrderNo());
    }
}

6. 高级特性

6.1 消息重新投递

系统提供了 RedisPendingMessageResendJob 定时任务,用于重新投递超时的 pending 消息:

java
@Slf4j
@AllArgsConstructor
public class RedisPendingMessageResendJob {

    private static final String LOCK_KEY = "redis:pending:msg:lock";
    private static final int EXPIRE_TIME = 5 * 60;

    private final List<AbstractRedisStreamMessageListener<?>> listeners;
    private final RedisMQTemplate redisTemplate;
    private final String groupName;
    private final RedissonClient redissonClient;

    @Scheduled(cron = "35 * * * * ?")
    public void messageResend() {
        RLock lock = redissonClient.getLock(LOCK_KEY);
        if (lock.tryLock()) {
            try {
                execute();
            } catch (Exception ex) {
                log.error("[messageResend][执行异常]", ex);
            } finally {
                lock.unlock();
            }
        }
    }

    private void execute() {
        StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
        listeners.forEach(listener -> {
            PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));
            Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
            pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
                PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);
                if (pendingMessages.isEmpty()) {
                    return;
                }
                pendingMessages.forEach(pendingMessage -> {
                    long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();
                    if (lastDelivery < EXPIRE_TIME) {
                        return;
                    }
                    List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),
                            Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));
                    if (CollUtil.isEmpty(records)) {
                        return;
                    }
                    redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
                            .ofObject(records.get(0).getValue())
                            .withStreamKey(listener.getStreamKey()));
                    redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));
                    log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
                });
            });
        });
    }
}

6.2 消息确认

Redis Stream 支持消息确认机制:

java
@Override
public void onMessage(ObjectRecord<String, String> message) {
    T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
    try {
        consumeMessageBefore(messageObj);
        this.onMessage(messageObj);
        redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
    } finally {
        consumeMessageAfter(messageObj);
    }
}

6.3 消费者组

Redis Stream 支持消费者组,实现集群消费:

java
@Bean
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
        RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {
    RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
    checkRedisVersion(redisTemplate);
    
    StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                    .batchSize(10)
                    .targetType(String.class)
                    .build();
    
    StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
            StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);

    String consumerName = buildConsumerName();
    listeners.parallelStream().forEach(listener -> {
        try {
            redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
        } catch (Exception ignore) {
        }
        listener.setRedisMQTemplate(redisMQTemplate);
        Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
        StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
        StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
                .builder(streamOffset).consumer(consumer)
                .autoAcknowledge(false)
                .cancelOnError(throwable -> false);
        container.register(builder.build(), listener);
    });
    return container;
}

7. 最佳实践

7.1 消息命名规范

  • 消息类名以 Message 结尾
  • 消息类名应该能够清晰描述消息的含义
  • 示例:OrderStatusChangeMessageRedisWebSocketMessageDemoChannelMessage

7.2 消息数据设计

java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.Data;

@Data
public class OrderStatusChangeMessage extends AbstractRedisStreamMessage {

    private Long orderId;
    private String orderNo;
    private Integer oldStatus;
    private Integer newStatus;
    private Long userId;
    private Long timestamp;

    public OrderStatusChangeMessage setOrder(OrderDO order, Integer newStatus) {
        this.orderId = order.getId();
        this.orderNo = order.getOrderNo();
        this.oldStatus = order.getStatus();
        this.newStatus = newStatus;
        this.userId = order.getUserId();
        this.timestamp = System.currentTimeMillis();
        return this;
    }
}

7.3 消息发送

java
import com.fanyi.cloud.framework.mq.redis.core.RedisMQTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class OrderMessageProducer {

    private final RedisMQTemplate redisMQTemplate;

    public OrderMessageProducer(RedisMQTemplate redisMQTemplate) {
        this.redisMQTemplate = redisMQTemplate;
    }

    public void sendOrderStatusChange(OrderDO order, Integer newStatus) {
        try {
            OrderStatusChangeMessage message = new OrderStatusChangeMessage()
                    .setOrder(order, newStatus);
            redisMQTemplate.send(message);
            log.info("[sendOrderStatusChange][订单状态变更消息发送成功,订单号: {},新状态: {}]",
                    order.getOrderNo(), newStatus);
        } catch (Exception ex) {
            log.error("[sendOrderStatusChange][订单状态变更消息发送失败,订单号: {}]",
                    order.getOrderNo(), ex);
        }
    }
}

7.4 消息监听

java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderStatusChangeListener extends AbstractRedisStreamMessageListener<OrderStatusChangeMessage> {

    @Override
    public void onMessage(OrderStatusChangeMessage message) {
        log.info("[onMessage][接收到订单状态变更消息,订单号: {},新状态: {}]",
                message.getOrderNo(), message.getNewStatus());
        try {
            processOrderStatusChange(message);
            log.info("[onMessage][订单状态变更消息处理完成,订单号: {}]",
                    message.getOrderNo());
        } catch (Exception ex) {
            log.error("[onMessage][订单状态变更消息处理失败,订单号: {}]",
                    message.getOrderNo(), ex);
        }
    }

    private void processOrderStatusChange(OrderStatusChangeMessage message) {
    }
}

7.5 消息幂等性

java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderStatusChangeListener extends AbstractRedisStreamMessageListener<OrderStatusChangeMessage> {

    @Override
    public void onMessage(OrderStatusChangeMessage message) {
        String lockKey = "order:status:change:" + message.getOrderId();
        RLock lock = redissonClient.getLock(lockKey);
        try {
            if (lock.tryLock()) {
                processOrderStatusChange(message);
            }
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

    private void processOrderStatusChange(OrderStatusChangeMessage message) {
        OrderDO order = orderMapper.selectById(message.getOrderId());
        if (order.getStatus().equals(message.getNewStatus())) {
            log.info("[processOrderStatusChange][订单状态已更新,跳过处理,订单号: {}]",
                    message.getOrderNo());
            return;
        }
    }
}

7.6 错误处理

java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderStatusChangeListener extends AbstractRedisStreamMessageListener<OrderStatusChangeMessage> {

    @Override
    public void onMessage(OrderStatusChangeMessage message) {
        try {
            processOrderStatusChange(message);
        } catch (BusinessException ex) {
            log.warn("[onMessage][业务异常,订单号: {},错误: {}]",
                    message.getOrderNo(), ex.getMessage());
        } catch (Exception ex) {
            log.error("[onMessage][系统异常,订单号: {}]",
                    message.getOrderNo(), ex);
        }
    }

    private void processOrderStatusChange(OrderStatusChangeMessage message) {
    }
}

7.7 日志记录

java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderStatusChangeListener extends AbstractRedisStreamMessageListener<OrderStatusChangeMessage> {

    @Override
    public void onMessage(OrderStatusChangeMessage message) {
        long startTime = System.currentTimeMillis();
        log.info("[onMessage][开始处理订单状态变更消息,订单号: {},新状态: {}]",
                message.getOrderNo(), message.getNewStatus());
        try {
            processOrderStatusChange(message);
            long endTime = System.currentTimeMillis();
            log.info("[onMessage][订单状态变更消息处理完成,订单号: {},耗时: {}ms]",
                    message.getOrderNo(), endTime - startTime);
        } catch (Exception ex) {
            long endTime = System.currentTimeMillis();
            log.error("[onMessage][订单状态变更消息处理失败,订单号: {},耗时: {}ms]",
                    message.getOrderNo(), endTime - startTime, ex);
        }
    }

    private void processOrderStatusChange(OrderStatusChangeMessage message) {
    }
}

8. 常见问题

8.1 消息不消费

问题:消息发送后监听器不消费

解决方案

  1. 检查监听器类是否添加了 @Component 注解
  2. 检查监听器是否在 Spring 容器的扫描路径下
  3. 检查 Redis 连接是否正常
  4. 检查消息类型是否匹配

8.2 消息重复消费

问题:消息被重复消费

解决方案

  1. 使用分布式锁防止重复消费
  2. 实现消息幂等性
  3. 检查消费者组配置

8.3 消息丢失

问题:消息发送后丢失

解决方案

  1. 使用 Redis Stream 模式,支持消息持久化
  2. 检查 Redis 内存配置
  3. 检查消息确认机制

8.4 消息消费失败

问题:消息消费失败后不重试

解决方案

  1. 检查 RedisPendingMessageResendJob 定时任务是否正常运行
  2. 检查消息超时时间配置
  3. 检查错误日志

8.5 性能问题

问题:消息消费速度慢

解决方案

  1. 增加消费者数量
  2. 优化消费逻辑,减少处理时间
  3. 增加批量处理大小

9. 监控和调试

9.1 消息发送监控

java
import com.fanyi.cloud.framework.mq.redis.core.RedisMQTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class MonitoredMessageProducer {

    private final RedisMQTemplate redisMQTemplate;

    public MonitoredMessageProducer(RedisMQTemplate redisMQTemplate) {
        this.redisMQTemplate = redisMQTemplate;
    }

    public void sendMessage(AbstractRedisMessage message) {
        long startTime = System.currentTimeMillis();
        log.info("[sendMessage][开始发送消息,消息类型: {}]", message.getClass().getSimpleName());
        try {
            redisMQTemplate.send(message);
            long endTime = System.currentTimeMillis();
            log.info("[sendMessage][消息发送完成,消息类型: {},耗时: {}ms]",
                    message.getClass().getSimpleName(), endTime - startTime);
        } catch (Exception ex) {
            long endTime = System.currentTimeMillis();
            log.error("[sendMessage][消息发送失败,消息类型: {},耗时: {}ms]",
                    message.getClass().getSimpleName(), endTime - startTime, ex);
        }
    }
}

9.2 消息消费监控

java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MonitoredMessageListener extends AbstractRedisStreamMessageListener<DemoStreamMessage> {

    @Override
    public void onMessage(DemoStreamMessage message) {
        long startTime = System.currentTimeMillis();
        log.info("[onMessage][开始处理消息,消息ID: {},消息类型: {}]",
                message.getMessageId(), message.getClass().getSimpleName());
        try {
            processMessage(message);
            long endTime = System.currentTimeMillis();
            log.info("[onMessage][消息处理完成,消息ID: {},耗时: {}ms]",
                    message.getMessageId(), endTime - startTime);
        } catch (Exception ex) {
            long endTime = System.currentTimeMillis();
            log.error("[onMessage][消息处理失败,消息ID: {},耗时: {}ms]",
                    message.getMessageId(), endTime - startTime, ex);
        }
    }

    private void processMessage(DemoStreamMessage message) {
    }
}

10. 注意事项

  1. Redis 版本:Redis Stream 需要 Redis 5.0+ 版本
  2. 消息持久化:Pub/Sub 模式消息不持久化,Stream 模式支持持久化
  3. 消息确认:Stream 模式支持消息确认,Pub/Sub 模式不支持
  4. 消费模式:Pub/Sub 适用于广播消费,Stream 适用于集群消费
  5. 消息幂等性:确保消息可以安全地重复消费
  6. 分布式锁:在分布式环境中使用分布式锁防止重复消费
  7. 错误处理:正确处理消息消费过程中的异常
  8. 日志记录:记录消息的发送和消费情况,便于问题排查
  9. 性能优化:对于大数据量的处理,使用批量处理提高性能
  10. 监控告警:配置消息发送和消费的监控告警

11. 与其他消息队列的对比

特性Redis 消息队列内存消息队列RabbitMQRocketMQKafka
适用场景单机/集群单机应用分布式分布式分布式
执行方式异步同步异步异步异步
复杂度
性能
可靠性
持久化Stream 支持不支持支持支持支持
消息确认Stream 支持不支持支持支持支持
消息重试支持不支持支持支持支持
广播消费Pub/Sub 支持支持支持支持支持
集群消费Stream 支持不支持支持支持支持

12. 相关文档