消息队列(Redis)
1. 概述
Redis 消息队列是梵医云系统中基于 Redis 实现的消息队列,支持两种消费模式:
- Redis Pub/Sub(发布订阅):用于广播消费,所有订阅者都能收到消息
- Redis Stream:用于集群消费,支持消息确认、消费组等高级特性
Redis 消息队列具有以下特点:
- 高性能:基于 Redis 内存数据库,读写速度快
- 支持集群:适用于分布式环境
- 多种模式:支持广播消费和集群消费
- 消息持久化:Stream 模式支持消息持久化
- 消息确认:Stream 模式支持消息确认机制
2. 技术架构
2.1 Redis Pub/Sub 架构
生产者 -> Redis Channel -> 订阅者1
-> 订阅者2
-> 订阅者3特点:
- 广播模式,所有订阅者都能收到消息
- 消息不持久化,未订阅时消息会丢失
- 适用于实时通知场景
2.2 Redis Stream 架构
生产者 -> Redis Stream -> 消费者组1 -> 消费者1
-> 消费者2
-> 消费者组2 -> 消费者3特点:
- 集群消费,支持消费者组
- 消息持久化,支持消息回溯
- 支持消息确认机制
- 适用于可靠消息传递场景
2.3 核心组件
| 组件 | 说明 | 位置 |
|---|---|---|
| RedisMQTemplate | Redis MQ 操作模板类 | fanyi-spring-boot-starter-mq |
| AbstractRedisChannelMessage | Redis Channel Message 抽象类 | fanyi-spring-boot-starter-mq |
| AbstractRedisStreamMessage | Redis Stream Message 抽象类 | fanyi-spring-boot-starter-mq |
| AbstractRedisChannelMessageListener | Redis Pub/Sub 监听器抽象类 | fanyi-spring-boot-starter-mq |
| AbstractRedisStreamMessageListener | Redis Stream 监听器抽象类 | fanyi-spring-boot-starter-mq |
| RedisMessageInterceptor | Redis 消息拦截器 | fanyi-spring-boot-starter-mq |
| RedisPendingMessageResendJob | Redis 消息重新投递任务 | fanyi-spring-boot-starter-mq |
| FanyiRedisMQProducerAutoConfiguration | Redis 消息队列 Producer 配置类 | fanyi-spring-boot-starter-mq |
| FanyiRedisMQConsumerAutoConfiguration | Redis 消息队列 Consumer 配置类 | fanyi-spring-boot-starter-mq |
3. 配置说明
3.1 Redis 配置
在 application.yaml 中配置 Redis:
yaml
spring:
data:
redis:
host: 127.0.0.1
port: 6379
database: 0
password: your-password
timeout: 3000
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: -1ms3.2 Redis 版本要求
- Redis Pub/Sub:Redis 2.0+
- Redis Stream:Redis 5.0+
系统会自动检查 Redis 版本,如果版本低于 5.0.0,会抛出异常。
4. 使用方式
4.1 Redis Pub/Sub 消息队列
4.1.1 定义 Channel 消息
继承 AbstractRedisChannelMessage 定义消息:
java
import com.fanyi.cloud.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import lombok.Data;
@Data
public class DemoChannelMessage extends AbstractRedisChannelMessage {
private String messageId;
private String messageType;
private String content;
private Long timestamp;
}4.1.2 发送 Channel 消息
使用 RedisMQTemplate 发送消息:
java
import com.fanyi.cloud.framework.mq.redis.core.RedisMQTemplate;
import org.springframework.stereotype.Service;
@Service
public class DemoMessageProducer {
private final RedisMQTemplate redisMQTemplate;
public DemoMessageProducer(RedisMQTemplate redisMQTemplate) {
this.redisMQTemplate = redisMQTemplate;
}
public void sendDemoMessage(String content) {
DemoChannelMessage message = new DemoChannelMessage();
message.setMessageId(UUID.randomUUID().toString());
message.setMessageType("DEMO_MESSAGE");
message.setContent(content);
message.setTimestamp(System.currentTimeMillis());
redisMQTemplate.send(message);
}
}4.1.3 监听 Channel 消息
继承 AbstractRedisChannelMessageListener 监听消息:
java
import com.fanyi.cloud.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DemoChannelMessageListener extends AbstractRedisChannelMessageListener<DemoChannelMessage> {
@Override
public void onMessage(DemoChannelMessage message) {
log.info("[onMessage][接收到消息,消息ID: {},消息类型: {},内容: {}]",
message.getMessageId(), message.getMessageType(), message.getContent());
}
}4.2 Redis Stream 消息队列
4.2.1 定义 Stream 消息
继承 AbstractRedisStreamMessage 定义消息:
java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.Data;
@Data
public class DemoStreamMessage extends AbstractRedisStreamMessage {
private String messageId;
private String messageType;
private String content;
private Long timestamp;
}4.2.2 发送 Stream 消息
使用 RedisMQTemplate 发送消息:
java
import com.fanyi.cloud.framework.mq.redis.core.RedisMQTemplate;
import org.springframework.stereotype.Service;
@Service
public class DemoMessageProducer {
private final RedisMQTemplate redisMQTemplate;
public DemoMessageProducer(RedisMQTemplate redisMQTemplate) {
this.redisMQTemplate = redisMQTemplate;
}
public void sendDemoMessage(String content) {
DemoStreamMessage message = new DemoStreamMessage();
message.setMessageId(UUID.randomUUID().toString());
message.setMessageType("DEMO_MESSAGE");
message.setContent(content);
message.setTimestamp(System.currentTimeMillis());
redisMQTemplate.send(message);
}
}4.2.3 监听 Stream 消息
继承 AbstractRedisStreamMessageListener 监听消息:
java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DemoStreamMessageListener extends AbstractRedisStreamMessageListener<DemoStreamMessage> {
@Override
public void onMessage(DemoStreamMessage message) {
log.info("[onMessage][接收到消息,消息ID: {},消息类型: {},内容: {}]",
message.getMessageId(), message.getMessageType(), message.getContent());
}
}4.3 消息拦截器
实现 RedisMessageInterceptor 接口添加拦截器:
java
import com.fanyi.cloud.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import com.fanyi.cloud.framework.mq.redis.core.message.AbstractRedisMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DemoRedisMessageInterceptor implements RedisMessageInterceptor {
@Override
public void sendMessageBefore(AbstractRedisMessage message) {
log.info("[sendMessageBefore][发送消息前,消息类型: {}]", message.getClass().getSimpleName());
}
@Override
public void sendMessageAfter(AbstractRedisMessage message) {
log.info("[sendMessageAfter][发送消息后,消息类型: {}]", message.getClass().getSimpleName());
}
@Override
public void consumeMessageBefore(AbstractRedisMessage message) {
log.info("[consumeMessageBefore][消费消息前,消息类型: {}]", message.getClass().getSimpleName());
}
@Override
public void consumeMessageAfter(AbstractRedisMessage message) {
log.info("[consumeMessageAfter][消费消息后,消息类型: {}]", message.getClass().getSimpleName());
}
}4.4 消息头信息
使用消息头传递额外信息:
java
import com.fanyi.cloud.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import lombok.Data;
@Data
public class DemoChannelMessage extends AbstractRedisChannelMessage {
private String messageId;
private String messageType;
private String content;
public DemoChannelMessage addTenantId(Long tenantId) {
addHeader("tenantId", String.valueOf(tenantId));
return this;
}
public Long getTenantId() {
String tenantId = getHeader("tenantId");
return tenantId != null ? Long.parseLong(tenantId) : null;
}
}5. 业务场景示例
5.1 WebSocket 消息广播
5.1.1 WebSocket 消息定义
java
import com.fanyi.cloud.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import lombok.Data;
@Data
public class RedisWebSocketMessage extends AbstractRedisChannelMessage {
private String sessionId;
private Integer userType;
private Long userId;
private String messageType;
private String messageContent;
}5.1.2 WebSocket 消息发送者
java
import com.fanyi.cloud.framework.mq.redis.core.RedisMQTemplate;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RedisWebSocketMessageSender extends AbstractWebSocketMessageSender {
private final RedisMQTemplate redisMQTemplate;
public RedisWebSocketMessageSender(WebSocketSessionManager sessionManager,
RedisMQTemplate redisMQTemplate) {
super(sessionManager);
this.redisMQTemplate = redisMQTemplate;
}
@Override
public void send(Integer userType, Long userId, String messageType, String messageContent) {
sendRedisMessage(null, userId, userType, messageType, messageContent);
}
@Override
public void send(Integer userType, String messageType, String messageContent) {
sendRedisMessage(null, null, userType, messageType, messageContent);
}
@Override
public void send(String sessionId, String messageType, String messageContent) {
sendRedisMessage(sessionId, null, null, messageType, messageContent);
}
private void sendRedisMessage(String sessionId, Long userId, Integer userType,
String messageType, String messageContent) {
RedisWebSocketMessage mqMessage = new RedisWebSocketMessage()
.setSessionId(sessionId)
.setUserId(userId)
.setUserType(userType)
.setMessageType(messageType)
.setMessageContent(messageContent);
redisMQTemplate.send(mqMessage);
}
}5.1.3 WebSocket 消息消费者
java
import com.fanyi.cloud.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class RedisWebSocketMessageConsumer extends AbstractRedisChannelMessageListener<RedisWebSocketMessage> {
private final RedisWebSocketMessageSender redisWebSocketMessageSender;
@Override
public void onMessage(RedisWebSocketMessage message) {
redisWebSocketMessageSender.send(message.getSessionId(),
message.getUserType(), message.getUserId(),
message.getMessageType(), message.getMessageContent());
}
}5.2 订单状态变更通知
5.2.1 订单状态消息定义
java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.Data;
@Data
public class OrderStatusChangeMessage extends AbstractRedisStreamMessage {
private Long orderId;
private String orderNo;
private Integer oldStatus;
private Integer newStatus;
private Long userId;
private Long timestamp;
}5.2.2 订单状态消息发送
java
import com.fanyi.cloud.framework.mq.redis.core.RedisMQTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderService {
private final RedisMQTemplate redisMQTemplate;
public OrderService(RedisMQTemplate redisMQTemplate) {
this.redisMQTemplate = redisMQTemplate;
}
@Transactional
public void updateOrderStatus(Long orderId, Integer newStatus) {
OrderDO order = orderMapper.selectById(orderId);
Integer oldStatus = order.getStatus();
order.setStatus(newStatus);
orderMapper.updateById(order);
OrderStatusChangeMessage message = new OrderStatusChangeMessage();
message.setOrderId(orderId);
message.setOrderNo(order.getOrderNo());
message.setOldStatus(oldStatus);
message.setNewStatus(newStatus);
message.setUserId(order.getUserId());
message.setTimestamp(System.currentTimeMillis());
redisMQTemplate.send(message);
}
}5.2.3 订单状态消息监听
java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderStatusChangeListener extends AbstractRedisStreamMessageListener<OrderStatusChangeMessage> {
@Override
public void onMessage(OrderStatusChangeMessage message) {
log.info("[onMessage][订单状态变更,订单号: {},旧状态: {},新状态: {}]",
message.getOrderNo(), message.getOldStatus(), message.getNewStatus());
switch (message.getNewStatus()) {
case 10:
handleOrderPaid(message);
break;
case 20:
handleOrderShipped(message);
break;
case 30:
handleOrderCompleted(message);
break;
}
}
private void handleOrderPaid(OrderStatusChangeMessage message) {
log.info("[handleOrderPaid][订单已支付,订单号: {}]", message.getOrderNo());
}
private void handleOrderShipped(OrderStatusChangeMessage message) {
log.info("[handleOrderShipped][订单已发货,订单号: {}]", message.getOrderNo());
}
private void handleOrderCompleted(OrderStatusChangeMessage message) {
log.info("[handleOrderCompleted][订单已完成,订单号: {}]", message.getOrderNo());
}
}6. 高级特性
6.1 消息重新投递
系统提供了 RedisPendingMessageResendJob 定时任务,用于重新投递超时的 pending 消息:
java
@Slf4j
@AllArgsConstructor
public class RedisPendingMessageResendJob {
private static final String LOCK_KEY = "redis:pending:msg:lock";
private static final int EXPIRE_TIME = 5 * 60;
private final List<AbstractRedisStreamMessageListener<?>> listeners;
private final RedisMQTemplate redisTemplate;
private final String groupName;
private final RedissonClient redissonClient;
@Scheduled(cron = "35 * * * * ?")
public void messageResend() {
RLock lock = redissonClient.getLock(LOCK_KEY);
if (lock.tryLock()) {
try {
execute();
} catch (Exception ex) {
log.error("[messageResend][执行异常]", ex);
} finally {
lock.unlock();
}
}
}
private void execute() {
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
listeners.forEach(listener -> {
PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);
if (pendingMessages.isEmpty()) {
return;
}
pendingMessages.forEach(pendingMessage -> {
long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();
if (lastDelivery < EXPIRE_TIME) {
return;
}
List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),
Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));
if (CollUtil.isEmpty(records)) {
return;
}
redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
.ofObject(records.get(0).getValue())
.withStreamKey(listener.getStreamKey()));
redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));
log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
});
});
});
}
}6.2 消息确认
Redis Stream 支持消息确认机制:
java
@Override
public void onMessage(ObjectRecord<String, String> message) {
T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
try {
consumeMessageBefore(messageObj);
this.onMessage(messageObj);
redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
} finally {
consumeMessageAfter(messageObj);
}
}6.3 消费者组
Redis Stream 支持消费者组,实现集群消费:
java
@Bean
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
checkRedisVersion(redisTemplate);
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(10)
.targetType(String.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
String consumerName = buildConsumerName();
listeners.parallelStream().forEach(listener -> {
try {
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
} catch (Exception ignore) {
}
listener.setRedisMQTemplate(redisMQTemplate);
Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
.builder(streamOffset).consumer(consumer)
.autoAcknowledge(false)
.cancelOnError(throwable -> false);
container.register(builder.build(), listener);
});
return container;
}7. 最佳实践
7.1 消息命名规范
- 消息类名以
Message结尾 - 消息类名应该能够清晰描述消息的含义
- 示例:
OrderStatusChangeMessage、RedisWebSocketMessage、DemoChannelMessage
7.2 消息数据设计
java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.Data;
@Data
public class OrderStatusChangeMessage extends AbstractRedisStreamMessage {
private Long orderId;
private String orderNo;
private Integer oldStatus;
private Integer newStatus;
private Long userId;
private Long timestamp;
public OrderStatusChangeMessage setOrder(OrderDO order, Integer newStatus) {
this.orderId = order.getId();
this.orderNo = order.getOrderNo();
this.oldStatus = order.getStatus();
this.newStatus = newStatus;
this.userId = order.getUserId();
this.timestamp = System.currentTimeMillis();
return this;
}
}7.3 消息发送
java
import com.fanyi.cloud.framework.mq.redis.core.RedisMQTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class OrderMessageProducer {
private final RedisMQTemplate redisMQTemplate;
public OrderMessageProducer(RedisMQTemplate redisMQTemplate) {
this.redisMQTemplate = redisMQTemplate;
}
public void sendOrderStatusChange(OrderDO order, Integer newStatus) {
try {
OrderStatusChangeMessage message = new OrderStatusChangeMessage()
.setOrder(order, newStatus);
redisMQTemplate.send(message);
log.info("[sendOrderStatusChange][订单状态变更消息发送成功,订单号: {},新状态: {}]",
order.getOrderNo(), newStatus);
} catch (Exception ex) {
log.error("[sendOrderStatusChange][订单状态变更消息发送失败,订单号: {}]",
order.getOrderNo(), ex);
}
}
}7.4 消息监听
java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderStatusChangeListener extends AbstractRedisStreamMessageListener<OrderStatusChangeMessage> {
@Override
public void onMessage(OrderStatusChangeMessage message) {
log.info("[onMessage][接收到订单状态变更消息,订单号: {},新状态: {}]",
message.getOrderNo(), message.getNewStatus());
try {
processOrderStatusChange(message);
log.info("[onMessage][订单状态变更消息处理完成,订单号: {}]",
message.getOrderNo());
} catch (Exception ex) {
log.error("[onMessage][订单状态变更消息处理失败,订单号: {}]",
message.getOrderNo(), ex);
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
}
}7.5 消息幂等性
java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderStatusChangeListener extends AbstractRedisStreamMessageListener<OrderStatusChangeMessage> {
@Override
public void onMessage(OrderStatusChangeMessage message) {
String lockKey = "order:status:change:" + message.getOrderId();
RLock lock = redissonClient.getLock(lockKey);
try {
if (lock.tryLock()) {
processOrderStatusChange(message);
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
OrderDO order = orderMapper.selectById(message.getOrderId());
if (order.getStatus().equals(message.getNewStatus())) {
log.info("[processOrderStatusChange][订单状态已更新,跳过处理,订单号: {}]",
message.getOrderNo());
return;
}
}
}7.6 错误处理
java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderStatusChangeListener extends AbstractRedisStreamMessageListener<OrderStatusChangeMessage> {
@Override
public void onMessage(OrderStatusChangeMessage message) {
try {
processOrderStatusChange(message);
} catch (BusinessException ex) {
log.warn("[onMessage][业务异常,订单号: {},错误: {}]",
message.getOrderNo(), ex.getMessage());
} catch (Exception ex) {
log.error("[onMessage][系统异常,订单号: {}]",
message.getOrderNo(), ex);
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
}
}7.7 日志记录
java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderStatusChangeListener extends AbstractRedisStreamMessageListener<OrderStatusChangeMessage> {
@Override
public void onMessage(OrderStatusChangeMessage message) {
long startTime = System.currentTimeMillis();
log.info("[onMessage][开始处理订单状态变更消息,订单号: {},新状态: {}]",
message.getOrderNo(), message.getNewStatus());
try {
processOrderStatusChange(message);
long endTime = System.currentTimeMillis();
log.info("[onMessage][订单状态变更消息处理完成,订单号: {},耗时: {}ms]",
message.getOrderNo(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[onMessage][订单状态变更消息处理失败,订单号: {},耗时: {}ms]",
message.getOrderNo(), endTime - startTime, ex);
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
}
}8. 常见问题
8.1 消息不消费
问题:消息发送后监听器不消费
解决方案:
- 检查监听器类是否添加了
@Component注解 - 检查监听器是否在 Spring 容器的扫描路径下
- 检查 Redis 连接是否正常
- 检查消息类型是否匹配
8.2 消息重复消费
问题:消息被重复消费
解决方案:
- 使用分布式锁防止重复消费
- 实现消息幂等性
- 检查消费者组配置
8.3 消息丢失
问题:消息发送后丢失
解决方案:
- 使用 Redis Stream 模式,支持消息持久化
- 检查 Redis 内存配置
- 检查消息确认机制
8.4 消息消费失败
问题:消息消费失败后不重试
解决方案:
- 检查
RedisPendingMessageResendJob定时任务是否正常运行 - 检查消息超时时间配置
- 检查错误日志
8.5 性能问题
问题:消息消费速度慢
解决方案:
- 增加消费者数量
- 优化消费逻辑,减少处理时间
- 增加批量处理大小
9. 监控和调试
9.1 消息发送监控
java
import com.fanyi.cloud.framework.mq.redis.core.RedisMQTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MonitoredMessageProducer {
private final RedisMQTemplate redisMQTemplate;
public MonitoredMessageProducer(RedisMQTemplate redisMQTemplate) {
this.redisMQTemplate = redisMQTemplate;
}
public void sendMessage(AbstractRedisMessage message) {
long startTime = System.currentTimeMillis();
log.info("[sendMessage][开始发送消息,消息类型: {}]", message.getClass().getSimpleName());
try {
redisMQTemplate.send(message);
long endTime = System.currentTimeMillis();
log.info("[sendMessage][消息发送完成,消息类型: {},耗时: {}ms]",
message.getClass().getSimpleName(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[sendMessage][消息发送失败,消息类型: {},耗时: {}ms]",
message.getClass().getSimpleName(), endTime - startTime, ex);
}
}
}9.2 消息消费监控
java
import com.fanyi.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MonitoredMessageListener extends AbstractRedisStreamMessageListener<DemoStreamMessage> {
@Override
public void onMessage(DemoStreamMessage message) {
long startTime = System.currentTimeMillis();
log.info("[onMessage][开始处理消息,消息ID: {},消息类型: {}]",
message.getMessageId(), message.getClass().getSimpleName());
try {
processMessage(message);
long endTime = System.currentTimeMillis();
log.info("[onMessage][消息处理完成,消息ID: {},耗时: {}ms]",
message.getMessageId(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[onMessage][消息处理失败,消息ID: {},耗时: {}ms]",
message.getMessageId(), endTime - startTime, ex);
}
}
private void processMessage(DemoStreamMessage message) {
}
}10. 注意事项
- Redis 版本:Redis Stream 需要 Redis 5.0+ 版本
- 消息持久化:Pub/Sub 模式消息不持久化,Stream 模式支持持久化
- 消息确认:Stream 模式支持消息确认,Pub/Sub 模式不支持
- 消费模式:Pub/Sub 适用于广播消费,Stream 适用于集群消费
- 消息幂等性:确保消息可以安全地重复消费
- 分布式锁:在分布式环境中使用分布式锁防止重复消费
- 错误处理:正确处理消息消费过程中的异常
- 日志记录:记录消息的发送和消费情况,便于问题排查
- 性能优化:对于大数据量的处理,使用批量处理提高性能
- 监控告警:配置消息发送和消费的监控告警
11. 与其他消息队列的对比
| 特性 | Redis 消息队列 | 内存消息队列 | RabbitMQ | RocketMQ | Kafka |
|---|---|---|---|---|---|
| 适用场景 | 单机/集群 | 单机应用 | 分布式 | 分布式 | 分布式 |
| 执行方式 | 异步 | 同步 | 异步 | 异步 | 异步 |
| 复杂度 | 中 | 低 | 高 | 高 | 高 |
| 性能 | 高 | 高 | 中 | 高 | 高 |
| 可靠性 | 中 | 低 | 高 | 高 | 高 |
| 持久化 | Stream 支持 | 不支持 | 支持 | 支持 | 支持 |
| 消息确认 | Stream 支持 | 不支持 | 支持 | 支持 | 支持 |
| 消息重试 | 支持 | 不支持 | 支持 | 支持 | 支持 |
| 广播消费 | Pub/Sub 支持 | 支持 | 支持 | 支持 | 支持 |
| 集群消费 | Stream 支持 | 不支持 | 支持 | 支持 | 支持 |
