Skip to content

消息队列(内存)

1. 概述

内存消息队列是梵医云系统中基于 Spring 框架的 ApplicationEvent 和 ApplicationListener 机制实现的轻量级消息队列。它适用于单机应用场景,提供同步的事件发布和订阅功能。

内存消息队列具有以下特点:

  • 轻量级:基于 Spring 原生机制,无需额外依赖
  • 同步执行:事件发布后立即执行监听器
  • 简单易用:使用 Spring 注解即可实现
  • 单机适用:适用于单机应用,不适用于分布式环境

2. 技术架构

2.1 核心组件

组件说明位置
ApplicationEventSpring 事件基类Spring Framework
ApplicationListenerSpring 事件监听器接口Spring Framework
ApplicationEventPublisher事件发布器Spring Framework
@EventListener事件监听器注解Spring Framework

2.2 工作原理

事件发布者 -> ApplicationEventPublisher -> ApplicationEvent -> ApplicationListener -> 事件处理逻辑
  1. 事件发布者通过 ApplicationEventPublisher 发布事件
  2. Spring 容器将事件广播给所有注册的监听器
  3. 监听器接收到事件并执行相应的处理逻辑

3. 使用方式

3.1 定义事件

继承 ApplicationEvent 类定义自定义事件:

java
import org.springframework.context.ApplicationEvent;
import lombok.Data;

@Data
public class CustomEvent extends ApplicationEvent {

    private String eventId;
    private String eventType;
    private Object data;

    public CustomEvent(Object source) {
        super(source);
    }

    public CustomEvent(Object source, String eventId, String eventType, Object data) {
        super(source);
        this.eventId = eventId;
        this.eventType = eventType;
        this.data = data;
    }
}

3.2 发布事件

使用 ApplicationEventPublisher 发布事件:

java
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;

@Service
public class EventPublisherService {

    private final ApplicationEventPublisher publisher;

    public EventPublisherService(ApplicationEventPublisher publisher) {
        this.publisher = publisher;
    }

    public void publishCustomEvent(String eventId, String eventType, Object data) {
        CustomEvent event = new CustomEvent(this, eventId, eventType, data);
        publisher.publishEvent(event);
    }
}

3.3 监听事件(方式一:实现 ApplicationListener 接口)

java
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

@Component
public class CustomEventListener implements ApplicationListener<CustomEvent> {

    @Override
    public void onApplicationEvent(CustomEvent event) {
        System.out.println("接收到事件: " + event.getEventType());
        System.out.println("事件数据: " + event.getData());
    }
}

3.4 监听事件(方式二:使用 @EventListener 注解)

java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class CustomEventListener {

    @EventListener
    public void handleCustomEvent(CustomEvent event) {
        System.out.println("接收到事件: " + event.getEventType());
        System.out.println("事件数据: " + event.getData());
    }
}

3.5 条件监听

使用 condition 参数实现条件监听:

java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class CustomEventListener {

    @EventListener(condition = "#event.eventType == 'ORDER_CREATED'")
    public void handleOrderCreatedEvent(CustomEvent event) {
        System.out.println("处理订单创建事件: " + event.getData());
    }

    @EventListener(condition = "#event.eventType == 'ORDER_CANCELLED'")
    public void handleOrderCancelledEvent(CustomEvent event) {
        System.out.println("处理订单取消事件: " + event.getData());
    }
}

3.6 异步监听

使用 @Async 注解实现异步监听:

java
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class CustomEventListener {

    @Async
    @EventListener
    public void handleCustomEvent(CustomEvent event) {
        System.out.println("异步处理事件: " + event.getEventType());
    }
}

3.7 监听器优先级

使用 @Order 注解设置监听器优先级:

java
import org.springframework.context.annotation.Order;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class CustomEventListener {

    @Order(1)
    @EventListener
    public void handleFirst(CustomEvent event) {
        System.out.println("第一个监听器");
    }

    @Order(2)
    @EventListener
    public void handleSecond(CustomEvent event) {
        System.out.println("第二个监听器");
    }
}

4. 业务场景示例

4.1 流程实例状态变化事件

java
import com.fanyi.cloud.module.bpm.enums.task.BpmProcessInstanceStatusEnum;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import org.springframework.context.ApplicationEvent;

@Data
public class BpmProcessInstanceStatusEvent extends ApplicationEvent {

    @NotNull(message = "流程实例的编号不能为空")
    private String id;

    @NotNull(message = "流程实例的 key 不能为空")
    private String processDefinitionKey;

    @NotNull(message = "流程实例的状态不能为空")
    private Integer status;

    private String businessKey;
    private String reason;

    public BpmProcessInstanceStatusEvent(Object source) {
        super(source);
    }
}

4.2 流程实例状态事件发布者

java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.validation.annotation.Validated;

@Validated
public class BpmProcessInstanceEventPublisher {

    private final ApplicationEventPublisher publisher;

    public BpmProcessInstanceEventPublisher(ApplicationEventPublisher publisher) {
        this.publisher = publisher;
    }

    public void sendProcessInstanceResultEvent(BpmProcessInstanceStatusEvent event) {
        publisher.publishEvent(event);
    }
}

4.3 流程实例状态事件监听器

java
import cn.hutool.core.util.StrUtil;
import org.springframework.context.ApplicationListener;

public abstract class BpmProcessInstanceStatusEventListener
        implements ApplicationListener<BpmProcessInstanceStatusEvent> {

    @Override
    public final void onApplicationEvent(BpmProcessInstanceStatusEvent event) {
        if (!StrUtil.equals(event.getProcessDefinitionKey(), getProcessDefinitionKey())) {
            return;
        }
        onEvent(event);
    }

    protected abstract String getProcessDefinitionKey();

    protected abstract void onEvent(BpmProcessInstanceStatusEvent event);
}

4.4 具体流程监听器实现

java
import org.springframework.stereotype.Component;

@Component
public class LeaveProcessInstanceStatusEventListener extends BpmProcessInstanceStatusEventListener {

    @Override
    protected String getProcessDefinitionKey() {
        return "leave";
    }

    @Override
    protected void onEvent(BpmProcessInstanceStatusEvent event) {
        System.out.println("请假流程状态变化: " + event.getStatus());
    }
}

4.5 订单创建事件

java
import lombok.Data;
import org.springframework.context.ApplicationEvent;

@Data
public class OrderCreatedEvent extends ApplicationEvent {

    private Long orderId;
    private Long userId;
    private String orderNo;
    private BigDecimal amount;

    public OrderCreatedEvent(Object source, Long orderId, Long userId, String orderNo, BigDecimal amount) {
        super(source);
        this.orderId = orderId;
        this.userId = userId;
        this.orderNo = orderNo;
        this.amount = amount;
    }
}

4.6 订单事件发布

java
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderService {

    private final ApplicationEventPublisher publisher;

    public OrderService(ApplicationEventPublisher publisher) {
        this.publisher = publisher;
    }

    @Transactional
    public void createOrder(OrderCreateReqVO reqVO) {
        OrderDO order = OrderDO.builder()
                .userId(reqVO.getUserId())
                .orderNo(generateOrderNo())
                .amount(reqVO.getAmount())
                .build();
        orderMapper.insert(order);

        publisher.publishEvent(new OrderCreatedEvent(this, order.getId(), order.getUserId(), order.getOrderNo(), order.getAmount()));
    }
}

4.7 订单事件监听

java
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class OrderEventListener {

    @Async
    @EventListener
    public void handleOrderCreatedEvent(OrderCreatedEvent event) {
        System.out.println("处理订单创建事件: " + event.getOrderNo());
    }

    @Async
    @EventListener
    public void sendOrderNotification(OrderCreatedEvent event) {
        System.out.println("发送订单通知: " + event.getOrderNo());
    }

    @Async
    @EventListener
    public void updateOrderStatistics(OrderCreatedEvent event) {
        System.out.println("更新订单统计: " + event.getOrderNo());
    }
}

5. 高级特性

5.1 事务事件监听

使用 @TransactionalEventListener 实现事务事件监听:

java
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

@Component
public class OrderEventListener {

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void handleOrderCreatedEvent(OrderCreatedEvent event) {
        System.out.println("事务提交后处理订单创建事件: " + event.getOrderNo());
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    public void handleOrderRollbackEvent(OrderCreatedEvent event) {
        System.out.println("事务回滚后处理订单事件: " + event.getOrderNo());
    }
}

5.2 泛型事件监听

java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class GenericEventListener {

    @EventListener
    public void handleEvent(ApplicationEvent event) {
        System.out.println("接收到所有事件: " + event.getClass().getSimpleName());
    }
}

5.3 事件传播

java
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;

@Service
public class EventPropagationService {

    private final ApplicationEventPublisher publisher;

    public EventPropagationService(ApplicationEventPublisher publisher) {
        this.publisher = publisher;
    }

    public void publishEventWithPropagation(CustomEvent event) {
        publisher.publishEvent(event);
    }
}

5.4 事件异常处理

java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class ErrorHandlingEventListener {

    @EventListener
    public void handleEvent(CustomEvent event) {
        try {
        } catch (Exception ex) {
            System.out.println("处理事件异常: " + ex.getMessage());
        }
    }
}

6. 最佳实践

6.1 事件命名规范

  • 事件类名以 Event 结尾
  • 事件类名应该能够清晰描述事件的含义
  • 示例:OrderCreatedEventBpmProcessInstanceStatusEventUserLoginEvent

6.2 事件数据设计

java
import lombok.Data;
import org.springframework.context.ApplicationEvent;

@Data
public class OrderCreatedEvent extends ApplicationEvent {

    private Long orderId;
    private String orderNo;
    private Long userId;
    private BigDecimal amount;
    private LocalDateTime createTime;

    public OrderCreatedEvent(Object source, OrderDO order) {
        super(source);
        this.orderId = order.getId();
        this.orderNo = order.getOrderNo();
        this.userId = order.getUserId();
        this.amount = order.getAmount();
        this.createTime = order.getCreateTime();
    }
}

6.3 事件监听器设计

java
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class OrderEventListener {

    @Async
    @EventListener
    public void handleOrderCreatedEvent(OrderCreatedEvent event) {
        try {
            processOrderCreated(event);
        } catch (Exception ex) {
            log.error("[handleOrderCreatedEvent][处理订单创建事件异常]", ex);
        }
    }

    private void processOrderCreated(OrderCreatedEvent event) {
        log.info("[processOrderCreated][处理订单创建事件,订单号: {}]", event.getOrderNo());
    }
}

6.4 事务边界控制

java
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderService {

    @Transactional
    public void createOrder(OrderCreateReqVO reqVO) {
        OrderDO order = OrderDO.builder()
                .userId(reqVO.getUserId())
                .orderNo(generateOrderNo())
                .amount(reqVO.getAmount())
                .build();
        orderMapper.insert(order);

        publisher.publishEvent(new OrderCreatedEvent(this, order));
    }
}

6.5 异步处理

java
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class AsyncEventListener {

    @Async("taskExecutor")
    @EventListener
    public void handleEventAsync(CustomEvent event) {
        System.out.println("异步处理事件: " + event.getEventType());
    }
}

6.6 错误处理

java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class ErrorHandlingEventListener {

    @EventListener
    public void handleEvent(CustomEvent event) {
        try {
            processEvent(event);
        } catch (Exception ex) {
            log.error("[handleEvent][处理事件异常,事件类型: {}]", event.getEventType(), ex);
        }
    }

    private void processEvent(CustomEvent event) {
    }
}

6.7 日志记录

java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class LoggingEventListener {

    @EventListener
    public void handleEvent(CustomEvent event) {
        log.info("[handleEvent][接收到事件,事件类型: {},事件ID: {}]", event.getEventType(), event.getEventId());
        try {
            processEvent(event);
            log.info("[handleEvent][事件处理完成,事件类型: {}]", event.getEventType());
        } catch (Exception ex) {
            log.error("[handleEvent][事件处理失败,事件类型: {}]", event.getEventType(), ex);
        }
    }

    private void processEvent(CustomEvent event) {
    }
}

7. 常见问题

7.1 事件监听器不执行

问题:事件发布后监听器不执行

解决方案

  1. 检查监听器类是否添加了 @Component 注解
  2. 检查监听器是否在 Spring 容器的扫描路径下
  3. 检查事件类型是否匹配

7.2 事件监听器执行顺序问题

问题:监听器执行顺序不符合预期

解决方案

  1. 使用 @Order 注解设置监听器优先级
  2. 数值越小,优先级越高

7.3 事务问题

问题:事件监听器在事务提交前执行

解决方案

  1. 使用 @TransactionalEventListener 注解
  2. 设置 phase 参数为 TransactionPhase.AFTER_COMMIT

7.4 异步执行问题

问题:异步监听器不执行

解决方案

  1. 确保启用了异步支持 @EnableAsync
  2. 检查线程池配置是否正确

7.5 性能问题

问题:事件监听器执行时间过长影响主流程

解决方案

  1. 使用 @Async 注解实现异步执行
  2. 优化监听器逻辑,减少执行时间
  3. 使用独立的线程池执行异步任务

8. 监控和调试

8.1 事件发布监控

java
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;

@Service
public class MonitoredEventPublisher {

    private final ApplicationEventPublisher publisher;

    public MonitoredEventPublisher(ApplicationEventPublisher publisher) {
        this.publisher = publisher;
    }

    public void publishEvent(ApplicationEvent event) {
        long startTime = System.currentTimeMillis();
        log.info("[publishEvent][开始发布事件,事件类型: {}]", event.getClass().getSimpleName());
        try {
            publisher.publishEvent(event);
            long endTime = System.currentTimeMillis();
            log.info("[publishEvent][事件发布完成,事件类型: {},耗时: {}ms]", event.getClass().getSimpleName(), endTime - startTime);
        } catch (Exception ex) {
            long endTime = System.currentTimeMillis();
            log.error("[publishEvent][事件发布失败,事件类型: {},耗时: {}ms]", event.getClass().getSimpleName(), endTime - startTime, ex);
        }
    }
}

8.2 事件监听监控

java
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class MonitoredEventListener {

    @EventListener
    public void handleEvent(CustomEvent event) {
        long startTime = System.currentTimeMillis();
        log.info("[handleEvent][开始处理事件,事件类型: {},事件ID: {}]", event.getEventType(), event.getEventId());
        try {
            processEvent(event);
            long endTime = System.currentTimeMillis();
            log.info("[handleEvent][事件处理完成,事件类型: {},事件ID: {},耗时: {}ms]", event.getEventType(), event.getEventId(), endTime - startTime);
        } catch (Exception ex) {
            long endTime = System.currentTimeMillis();
            log.error("[handleEvent][事件处理失败,事件类型: {},事件ID: {},耗时: {}ms]", event.getEventType(), event.getEventId(), endTime - startTime, ex);
        }
    }

    private void processEvent(CustomEvent event) {
    }
}

9. 注意事项

  1. 同步执行:内存消息队列是同步执行的,事件发布后立即执行监听器
  2. 单机适用:适用于单机应用,不适用于分布式环境
  3. 异常处理:监听器中的异常不会影响其他监听器的执行
  4. 事务边界:注意事件发布和监听器执行的事务边界
  5. 性能考虑:监听器执行时间过长会影响主流程性能
  6. 事件不可变:事件对象应该是不可变的,避免在监听器中修改
  7. 监听器幂等性:确保监听器可以安全地重复执行
  8. 日志记录:记录事件的发布和消费情况,便于问题排查
  9. 异步执行:对于耗时操作,使用 @Async 注解实现异步执行
  10. 条件监听:使用 condition 参数实现条件监听,避免不必要的监听器执行

10. 与其他消息队列的对比

特性内存消息队列Redis 消息队列RabbitMQRocketMQKafka
适用场景单机应用单机/集群分布式分布式分布式
执行方式同步异步异步异步异步
复杂度
性能
可靠性
持久化不支持支持支持支持支持
消息确认不支持支持支持支持支持
消息重试不支持支持支持支持支持

11. 相关文档