消息队列(内存)
1. 概述
内存消息队列是梵医云系统中基于 Spring 框架的 ApplicationEvent 和 ApplicationListener 机制实现的轻量级消息队列。它适用于单机应用场景,提供同步的事件发布和订阅功能。
内存消息队列具有以下特点:
- 轻量级:基于 Spring 原生机制,无需额外依赖
- 同步执行:事件发布后立即执行监听器
- 简单易用:使用 Spring 注解即可实现
- 单机适用:适用于单机应用,不适用于分布式环境
2. 技术架构
2.1 核心组件
| 组件 | 说明 | 位置 |
|---|---|---|
| ApplicationEvent | Spring 事件基类 | Spring Framework |
| ApplicationListener | Spring 事件监听器接口 | Spring Framework |
| ApplicationEventPublisher | 事件发布器 | Spring Framework |
| @EventListener | 事件监听器注解 | Spring Framework |
2.2 工作原理
事件发布者 -> ApplicationEventPublisher -> ApplicationEvent -> ApplicationListener -> 事件处理逻辑- 事件发布者通过
ApplicationEventPublisher发布事件 - Spring 容器将事件广播给所有注册的监听器
- 监听器接收到事件并执行相应的处理逻辑
3. 使用方式
3.1 定义事件
继承 ApplicationEvent 类定义自定义事件:
java
import org.springframework.context.ApplicationEvent;
import lombok.Data;
@Data
public class CustomEvent extends ApplicationEvent {
private String eventId;
private String eventType;
private Object data;
public CustomEvent(Object source) {
super(source);
}
public CustomEvent(Object source, String eventId, String eventType, Object data) {
super(source);
this.eventId = eventId;
this.eventType = eventType;
this.data = data;
}
}3.2 发布事件
使用 ApplicationEventPublisher 发布事件:
java
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
@Service
public class EventPublisherService {
private final ApplicationEventPublisher publisher;
public EventPublisherService(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
public void publishCustomEvent(String eventId, String eventType, Object data) {
CustomEvent event = new CustomEvent(this, eventId, eventType, data);
publisher.publishEvent(event);
}
}3.3 监听事件(方式一:实现 ApplicationListener 接口)
java
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
@Component
public class CustomEventListener implements ApplicationListener<CustomEvent> {
@Override
public void onApplicationEvent(CustomEvent event) {
System.out.println("接收到事件: " + event.getEventType());
System.out.println("事件数据: " + event.getData());
}
}3.4 监听事件(方式二:使用 @EventListener 注解)
java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class CustomEventListener {
@EventListener
public void handleCustomEvent(CustomEvent event) {
System.out.println("接收到事件: " + event.getEventType());
System.out.println("事件数据: " + event.getData());
}
}3.5 条件监听
使用 condition 参数实现条件监听:
java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class CustomEventListener {
@EventListener(condition = "#event.eventType == 'ORDER_CREATED'")
public void handleOrderCreatedEvent(CustomEvent event) {
System.out.println("处理订单创建事件: " + event.getData());
}
@EventListener(condition = "#event.eventType == 'ORDER_CANCELLED'")
public void handleOrderCancelledEvent(CustomEvent event) {
System.out.println("处理订单取消事件: " + event.getData());
}
}3.6 异步监听
使用 @Async 注解实现异步监听:
java
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class CustomEventListener {
@Async
@EventListener
public void handleCustomEvent(CustomEvent event) {
System.out.println("异步处理事件: " + event.getEventType());
}
}3.7 监听器优先级
使用 @Order 注解设置监听器优先级:
java
import org.springframework.context.annotation.Order;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class CustomEventListener {
@Order(1)
@EventListener
public void handleFirst(CustomEvent event) {
System.out.println("第一个监听器");
}
@Order(2)
@EventListener
public void handleSecond(CustomEvent event) {
System.out.println("第二个监听器");
}
}4. 业务场景示例
4.1 流程实例状态变化事件
java
import com.fanyi.cloud.module.bpm.enums.task.BpmProcessInstanceStatusEnum;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import org.springframework.context.ApplicationEvent;
@Data
public class BpmProcessInstanceStatusEvent extends ApplicationEvent {
@NotNull(message = "流程实例的编号不能为空")
private String id;
@NotNull(message = "流程实例的 key 不能为空")
private String processDefinitionKey;
@NotNull(message = "流程实例的状态不能为空")
private Integer status;
private String businessKey;
private String reason;
public BpmProcessInstanceStatusEvent(Object source) {
super(source);
}
}4.2 流程实例状态事件发布者
java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.validation.annotation.Validated;
@Validated
public class BpmProcessInstanceEventPublisher {
private final ApplicationEventPublisher publisher;
public BpmProcessInstanceEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
public void sendProcessInstanceResultEvent(BpmProcessInstanceStatusEvent event) {
publisher.publishEvent(event);
}
}4.3 流程实例状态事件监听器
java
import cn.hutool.core.util.StrUtil;
import org.springframework.context.ApplicationListener;
public abstract class BpmProcessInstanceStatusEventListener
implements ApplicationListener<BpmProcessInstanceStatusEvent> {
@Override
public final void onApplicationEvent(BpmProcessInstanceStatusEvent event) {
if (!StrUtil.equals(event.getProcessDefinitionKey(), getProcessDefinitionKey())) {
return;
}
onEvent(event);
}
protected abstract String getProcessDefinitionKey();
protected abstract void onEvent(BpmProcessInstanceStatusEvent event);
}4.4 具体流程监听器实现
java
import org.springframework.stereotype.Component;
@Component
public class LeaveProcessInstanceStatusEventListener extends BpmProcessInstanceStatusEventListener {
@Override
protected String getProcessDefinitionKey() {
return "leave";
}
@Override
protected void onEvent(BpmProcessInstanceStatusEvent event) {
System.out.println("请假流程状态变化: " + event.getStatus());
}
}4.5 订单创建事件
java
import lombok.Data;
import org.springframework.context.ApplicationEvent;
@Data
public class OrderCreatedEvent extends ApplicationEvent {
private Long orderId;
private Long userId;
private String orderNo;
private BigDecimal amount;
public OrderCreatedEvent(Object source, Long orderId, Long userId, String orderNo, BigDecimal amount) {
super(source);
this.orderId = orderId;
this.userId = userId;
this.orderNo = orderNo;
this.amount = amount;
}
}4.6 订单事件发布
java
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderService {
private final ApplicationEventPublisher publisher;
public OrderService(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
@Transactional
public void createOrder(OrderCreateReqVO reqVO) {
OrderDO order = OrderDO.builder()
.userId(reqVO.getUserId())
.orderNo(generateOrderNo())
.amount(reqVO.getAmount())
.build();
orderMapper.insert(order);
publisher.publishEvent(new OrderCreatedEvent(this, order.getId(), order.getUserId(), order.getOrderNo(), order.getAmount()));
}
}4.7 订单事件监听
java
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class OrderEventListener {
@Async
@EventListener
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
System.out.println("处理订单创建事件: " + event.getOrderNo());
}
@Async
@EventListener
public void sendOrderNotification(OrderCreatedEvent event) {
System.out.println("发送订单通知: " + event.getOrderNo());
}
@Async
@EventListener
public void updateOrderStatistics(OrderCreatedEvent event) {
System.out.println("更新订单统计: " + event.getOrderNo());
}
}5. 高级特性
5.1 事务事件监听
使用 @TransactionalEventListener 实现事务事件监听:
java
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
@Component
public class OrderEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
System.out.println("事务提交后处理订单创建事件: " + event.getOrderNo());
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void handleOrderRollbackEvent(OrderCreatedEvent event) {
System.out.println("事务回滚后处理订单事件: " + event.getOrderNo());
}
}5.2 泛型事件监听
java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class GenericEventListener {
@EventListener
public void handleEvent(ApplicationEvent event) {
System.out.println("接收到所有事件: " + event.getClass().getSimpleName());
}
}5.3 事件传播
java
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
@Service
public class EventPropagationService {
private final ApplicationEventPublisher publisher;
public EventPropagationService(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
public void publishEventWithPropagation(CustomEvent event) {
publisher.publishEvent(event);
}
}5.4 事件异常处理
java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class ErrorHandlingEventListener {
@EventListener
public void handleEvent(CustomEvent event) {
try {
} catch (Exception ex) {
System.out.println("处理事件异常: " + ex.getMessage());
}
}
}6. 最佳实践
6.1 事件命名规范
- 事件类名以
Event结尾 - 事件类名应该能够清晰描述事件的含义
- 示例:
OrderCreatedEvent、BpmProcessInstanceStatusEvent、UserLoginEvent
6.2 事件数据设计
java
import lombok.Data;
import org.springframework.context.ApplicationEvent;
@Data
public class OrderCreatedEvent extends ApplicationEvent {
private Long orderId;
private String orderNo;
private Long userId;
private BigDecimal amount;
private LocalDateTime createTime;
public OrderCreatedEvent(Object source, OrderDO order) {
super(source);
this.orderId = order.getId();
this.orderNo = order.getOrderNo();
this.userId = order.getUserId();
this.amount = order.getAmount();
this.createTime = order.getCreateTime();
}
}6.3 事件监听器设计
java
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class OrderEventListener {
@Async
@EventListener
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
try {
processOrderCreated(event);
} catch (Exception ex) {
log.error("[handleOrderCreatedEvent][处理订单创建事件异常]", ex);
}
}
private void processOrderCreated(OrderCreatedEvent event) {
log.info("[processOrderCreated][处理订单创建事件,订单号: {}]", event.getOrderNo());
}
}6.4 事务边界控制
java
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderService {
@Transactional
public void createOrder(OrderCreateReqVO reqVO) {
OrderDO order = OrderDO.builder()
.userId(reqVO.getUserId())
.orderNo(generateOrderNo())
.amount(reqVO.getAmount())
.build();
orderMapper.insert(order);
publisher.publishEvent(new OrderCreatedEvent(this, order));
}
}6.5 异步处理
java
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class AsyncEventListener {
@Async("taskExecutor")
@EventListener
public void handleEventAsync(CustomEvent event) {
System.out.println("异步处理事件: " + event.getEventType());
}
}6.6 错误处理
java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class ErrorHandlingEventListener {
@EventListener
public void handleEvent(CustomEvent event) {
try {
processEvent(event);
} catch (Exception ex) {
log.error("[handleEvent][处理事件异常,事件类型: {}]", event.getEventType(), ex);
}
}
private void processEvent(CustomEvent event) {
}
}6.7 日志记录
java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class LoggingEventListener {
@EventListener
public void handleEvent(CustomEvent event) {
log.info("[handleEvent][接收到事件,事件类型: {},事件ID: {}]", event.getEventType(), event.getEventId());
try {
processEvent(event);
log.info("[handleEvent][事件处理完成,事件类型: {}]", event.getEventType());
} catch (Exception ex) {
log.error("[handleEvent][事件处理失败,事件类型: {}]", event.getEventType(), ex);
}
}
private void processEvent(CustomEvent event) {
}
}7. 常见问题
7.1 事件监听器不执行
问题:事件发布后监听器不执行
解决方案:
- 检查监听器类是否添加了
@Component注解 - 检查监听器是否在 Spring 容器的扫描路径下
- 检查事件类型是否匹配
7.2 事件监听器执行顺序问题
问题:监听器执行顺序不符合预期
解决方案:
- 使用
@Order注解设置监听器优先级 - 数值越小,优先级越高
7.3 事务问题
问题:事件监听器在事务提交前执行
解决方案:
- 使用
@TransactionalEventListener注解 - 设置
phase参数为TransactionPhase.AFTER_COMMIT
7.4 异步执行问题
问题:异步监听器不执行
解决方案:
- 确保启用了异步支持
@EnableAsync - 检查线程池配置是否正确
7.5 性能问题
问题:事件监听器执行时间过长影响主流程
解决方案:
- 使用
@Async注解实现异步执行 - 优化监听器逻辑,减少执行时间
- 使用独立的线程池执行异步任务
8. 监控和调试
8.1 事件发布监控
java
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
@Service
public class MonitoredEventPublisher {
private final ApplicationEventPublisher publisher;
public MonitoredEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
public void publishEvent(ApplicationEvent event) {
long startTime = System.currentTimeMillis();
log.info("[publishEvent][开始发布事件,事件类型: {}]", event.getClass().getSimpleName());
try {
publisher.publishEvent(event);
long endTime = System.currentTimeMillis();
log.info("[publishEvent][事件发布完成,事件类型: {},耗时: {}ms]", event.getClass().getSimpleName(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[publishEvent][事件发布失败,事件类型: {},耗时: {}ms]", event.getClass().getSimpleName(), endTime - startTime, ex);
}
}
}8.2 事件监听监控
java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class MonitoredEventListener {
@EventListener
public void handleEvent(CustomEvent event) {
long startTime = System.currentTimeMillis();
log.info("[handleEvent][开始处理事件,事件类型: {},事件ID: {}]", event.getEventType(), event.getEventId());
try {
processEvent(event);
long endTime = System.currentTimeMillis();
log.info("[handleEvent][事件处理完成,事件类型: {},事件ID: {},耗时: {}ms]", event.getEventType(), event.getEventId(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[handleEvent][事件处理失败,事件类型: {},事件ID: {},耗时: {}ms]", event.getEventType(), event.getEventId(), endTime - startTime, ex);
}
}
private void processEvent(CustomEvent event) {
}
}9. 注意事项
- 同步执行:内存消息队列是同步执行的,事件发布后立即执行监听器
- 单机适用:适用于单机应用,不适用于分布式环境
- 异常处理:监听器中的异常不会影响其他监听器的执行
- 事务边界:注意事件发布和监听器执行的事务边界
- 性能考虑:监听器执行时间过长会影响主流程性能
- 事件不可变:事件对象应该是不可变的,避免在监听器中修改
- 监听器幂等性:确保监听器可以安全地重复执行
- 日志记录:记录事件的发布和消费情况,便于问题排查
- 异步执行:对于耗时操作,使用
@Async注解实现异步执行 - 条件监听:使用
condition参数实现条件监听,避免不必要的监听器执行
10. 与其他消息队列的对比
| 特性 | 内存消息队列 | Redis 消息队列 | RabbitMQ | RocketMQ | Kafka |
|---|---|---|---|---|---|
| 适用场景 | 单机应用 | 单机/集群 | 分布式 | 分布式 | 分布式 |
| 执行方式 | 同步 | 异步 | 异步 | 异步 | 异步 |
| 复杂度 | 低 | 中 | 高 | 高 | 高 |
| 性能 | 高 | 高 | 中 | 高 | 高 |
| 可靠性 | 低 | 中 | 高 | 高 | 高 |
| 持久化 | 不支持 | 支持 | 支持 | 支持 | 支持 |
| 消息确认 | 不支持 | 支持 | 支持 | 支持 | 支持 |
| 消息重试 | 不支持 | 支持 | 支持 | 支持 | 支持 |
