消息队列(Kafka)
1. 概述
Kafka 是梵医云系统中使用的分布式消息队列中间件,基于 Apache Kafka 实现,具有高吞吐量、高可用、低延迟等特点。系统基于 spring-kafka 实现 Kafka 的集成。
Kafka 消息队列具有以下特点:
- 高吞吐量:单机可达百万级 TPS
- 高可用:支持集群部署,故障自动切换
- 低延迟:毫秒级消息延迟
- 消息可靠:支持消息持久化、消息重试、消息回溯
- 分布式支持:适用于分布式环境
- 多种消费模式:支持集群消费、广播消费
2. 技术架构
2.1 Kafka 架构
生产者 -> Kafka Broker -> Topic -> Partition 0 -> 消费者组1 -> 消费者1
-> Partition 1 -> 消费者组1 -> 消费者2
-> Partition 2 -> 消费者组2 -> 消费者3核心组件:
- Producer:消息生产者,负责发送消息
- Broker:Kafka 服务器,负责消息存储和转发
- Topic:消息主题,消息的分类
- Partition:分区,Topic 的物理分区,用于提高并发
- Consumer Group:消费者组,同一组内的消费者分担消费任务
- Consumer:消息消费者,负责消费消息
- Offset:偏移量,消息在分区中的位置
2.2 消息模型
2.2.1 集群消费(Clustering)
Topic -> Consumer Group -> Consumer1
-> Consumer2
-> Consumer3特点:
- 同一消费者组内的消费者分担消费任务
- 每条消息只会被组内一个消费者消费
- 适用于负载均衡场景
2.2.2 广播消费(Broadcasting)
Topic -> Consumer Group1 -> Consumer1
-> Consumer Group2 -> Consumer2
-> Consumer Group3 -> Consumer3特点:
- 每个消费者组都会收到消息
- 适用于广播通知场景
2.3 核心组件
| 组件 | 说明 | 位置 |
|---|---|---|
| KafkaTemplate | Kafka 操作模板类 | spring-kafka |
| @KafkaListener | Kafka 消息监听器注解 | spring-kafka |
| ProducerInterceptor | Producer 拦截器 | Kafka |
| TenantKafkaProducerInterceptor | 多租户 Producer 拦截器 | fanyi-spring-boot-starter-biz-tenant |
| TenantKafkaEnvironmentPostProcessor | 多租户 Kafka 环境后处理器 | fanyi-spring-boot-starter-biz-tenant |
3. 配置说明
3.1 Maven 依赖
在 pom.xml 中添加依赖:
xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>3.2 Kafka 配置
在 application.yaml 中配置 Kafka:
yaml
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
properties:
interceptor.classes: com.fanyi.cloud.framework.tenant.core.mq.kafka.TenantKafkaProducerInterceptor
consumer:
group-id: fanyi-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
listener:
ack-mode: manual_immediate
concurrency: 53.3 WebSocket Kafka 配置
在 application.yaml 中配置 WebSocket Kafka:
yaml
fanyi:
websocket:
sender-type: kafka
sender-kafka:
topic: websocket-topic
consumer-group: websocket-consumer-group3.4 配置参数说明
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| spring.kafka.bootstrap-servers | String | 是 | - | Kafka 服务器地址,多个地址用逗号分隔 |
| spring.kafka.producer.key-serializer | String | 是 | - | Key 序列化器 |
| spring.kafka.producer.value-serializer | String | 是 | - | Value 序列化器 |
| spring.kafka.producer.acks | String | 否 | 1 | 确认模式(0/1/all) |
| spring.kafka.producer.retries | Integer | 否 | 3 | 发送失败重试次数 |
| spring.kafka.consumer.group-id | String | 是 | - | 消费者组 ID |
| spring.kafka.consumer.key-deserializer | String | 是 | - | Key 反序列化器 |
| spring.kafka.consumer.value-deserializer | String | 是 | - | Value 反序列化器 |
| spring.kafka.consumer.auto-offset-reset | String | 否 | latest | 偏移量重置策略(earliest/latest/none) |
| spring.kafka.consumer.enable-auto-commit | Boolean | 否 | true | 是否自动提交偏移量 |
| spring.kafka.listener.ack-mode | String | 否 | batch | 确认模式 |
| spring.kafka.listener.concurrency | Integer | 否 | 1 | 消费者并发数 |
4. 使用方式
4.1 定义消息
定义消息实体类:
java
import lombok.Data;
@Data
public class DemoMessage {
private Long id;
private String messageType;
private String content;
private Long userId;
private Long timestamp;
}4.2 发送消息
使用 KafkaTemplate 发送消息:
java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class DemoMessageProducer {
private final KafkaTemplate<Object, Object> kafkaTemplate;
private final String topic = "demo-topic";
public DemoMessageProducer(KafkaTemplate<Object, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendDemoMessage(String content) {
DemoMessage message = new DemoMessage();
message.setMessageType("DEMO_MESSAGE");
message.setContent(content);
message.setUserId(1L);
message.setTimestamp(System.currentTimeMillis());
kafkaTemplate.send(topic, message);
}
}4.3 监听消息
使用 @KafkaListener 注解监听消息:
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DemoMessageConsumer {
@KafkaListener(
topics = "demo-topic",
groupId = "demo-consumer-group"
)
public void onMessage(@Payload DemoMessage message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("[onMessage][接收到消息,Topic: {},Partition: {},Offset: {},消息类型: {},内容: {}]",
topic, partition, offset, message.getMessageType(), message.getContent());
}
}4.4 广播消息
使用不同的消费者组实现广播:
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DemoMessageConsumer {
@KafkaListener(
topics = "demo-topic",
// 在 Group 上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Group 不同,以达到广播消费的目的
groupId = "demo-consumer-group" + "-" + "#{T(java.util.UUID).randomUUID()}"
)
public void onMessage(@Payload DemoMessage message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("[onMessage][接收到消息,Topic: {},Partition: {},Offset: {},消息类型: {},内容: {}]",
topic, partition, offset, message.getMessageType(), message.getContent());
}
}5. 业务场景示例
5.1 WebSocket 消息广播
5.1.1 WebSocket 消息定义
java
import lombok.Data;
@Data
public class KafkaWebSocketMessage {
private String sessionId;
private Integer userType;
private Long userId;
private String messageType;
private String messageContent;
}5.1.2 WebSocket 消息发送者
java
import com.fanyi.cloud.framework.websocket.core.sender.AbstractWebSocketMessageSender;
import com.fanyi.cloud.framework.websocket.core.sender.WebSocketMessageSender;
import com.fanyi.cloud.framework.websocket.core.session.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.concurrent.ExecutionException;
@Slf4j
public class KafkaWebSocketMessageSender extends AbstractWebSocketMessageSender {
private final KafkaTemplate<Object, Object> kafkaTemplate;
private final String topic;
public KafkaWebSocketMessageSender(WebSocketSessionManager sessionManager,
KafkaTemplate<Object, Object> kafkaTemplate,
String topic) {
super(sessionManager);
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
@Override
public void send(Integer userType, Long userId, String messageType, String messageContent) {
sendKafkaMessage(null, userId, userType, messageType, messageContent);
}
@Override
public void send(Integer userType, String messageType, String messageContent) {
sendKafkaMessage(null, null, userType, messageType, messageContent);
}
@Override
public void send(String sessionId, String messageType, String messageContent) {
sendKafkaMessage(sessionId, null, null, messageType, messageContent);
}
private void sendKafkaMessage(String sessionId, Long userId, Integer userType,
String messageType, String messageContent) {
KafkaWebSocketMessage mqMessage = new KafkaWebSocketMessage()
.setSessionId(sessionId)
.setUserId(userId)
.setUserType(userType)
.setMessageType(messageType)
.setMessageContent(messageContent);
try {
kafkaTemplate.send(topic, mqMessage).get();
} catch (InterruptedException | ExecutionException e) {
log.error("[sendKafkaMessage][发送消息({}) 到 Kafka 失败]", mqMessage, e);
}
}
}5.1.3 WebSocket 消息消费者
java
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
@RequiredArgsConstructor
public class KafkaWebSocketMessageConsumer {
private final KafkaWebSocketMessageSender kafkaWebSocketMessageSender;
@KafkaListener(
topics = "${fanyi.websocket.sender-kafka.topic}",
// 在 Group 上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Group 不同,以达到广播消费的目的
groupId = "${fanyi.websocket.sender-kafka.consumer-group}" + "-" + "#{T(java.util.UUID).randomUUID()}")
public void onMessage(KafkaWebSocketMessage message) {
kafkaWebSocketMessageSender.send(message.getSessionId(),
message.getUserType(), message.getUserId(),
message.getMessageType(), message.getMessageContent());
}
}5.2 订单状态变更通知
5.2.1 订单状态消息定义
java
import lombok.Data;
@Data
public class OrderStatusChangeMessage {
private Long orderId;
private String orderNo;
private Integer oldStatus;
private Integer newStatus;
private Long userId;
private Long timestamp;
}5.2.2 订单状态消息发送
java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderService {
private final KafkaTemplate<Object, Object> kafkaTemplate;
private final String topic = "order-status-change-topic";
public OrderService(KafkaTemplate<Object, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Transactional
public void updateOrderStatus(Long orderId, Integer newStatus) {
OrderDO order = orderMapper.selectById(orderId);
Integer oldStatus = order.getStatus();
order.setStatus(newStatus);
orderMapper.updateById(order);
OrderStatusChangeMessage message = new OrderStatusChangeMessage();
message.setOrderId(orderId);
message.setOrderNo(order.getOrderNo());
message.setOldStatus(oldStatus);
message.setNewStatus(newStatus);
message.setUserId(order.getUserId());
message.setTimestamp(System.currentTimeMillis());
kafkaTemplate.send(topic, message);
}
}5.2.3 订单状态消息监听
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderStatusChangeListener {
@KafkaListener(
topics = "order-status-change-topic",
groupId = "order-status-consumer-group"
)
public void onMessage(@Payload OrderStatusChangeMessage message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("[onMessage][订单状态变更,订单号: {},旧状态: {},新状态: {}]",
message.getOrderNo(), message.getOldStatus(), message.getNewStatus());
switch (message.getNewStatus()) {
case 10:
handleOrderPaid(message);
break;
case 20:
handleOrderShipped(message);
break;
case 30:
handleOrderCompleted(message);
break;
}
}
private void handleOrderPaid(OrderStatusChangeMessage message) {
log.info("[handleOrderPaid][订单已支付,订单号: {}]", message.getOrderNo());
}
private void handleOrderShipped(OrderStatusChangeMessage message) {
log.info("[handleOrderShipped][订单已发货,订单号: {}]", message.getOrderNo());
}
private void handleOrderCompleted(OrderStatusChangeMessage message) {
log.info("[handleOrderCompleted][订单已完成,订单号: {}]", message.getOrderNo());
}
}6. 多租户支持
6.1 Producer 拦截器
实现 ProducerInterceptor 接口,在发送消息时添加租户信息:
java
import cn.hutool.core.util.ReflectUtil;
import com.fanyi.cloud.framework.tenant.core.context.TenantContextHolder;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import java.util.Map;
public class TenantKafkaProducerInterceptor implements ProducerInterceptor<Object, Object> {
@Override
public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId != null) {
Headers headers = (Headers) ReflectUtil.getFieldValue(record, "headers");
headers.add("tenant-id", tenantId.toString().getBytes());
}
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}6.2 环境后处理器
实现 EnvironmentPostProcessor 接口,在 Kafka 初始化时注册多租户拦截器:
java
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.core.env.ConfigurableEnvironment;
@Slf4j
public class TenantKafkaEnvironmentPostProcessor implements EnvironmentPostProcessor {
private static final String PROPERTY_KEY_INTERCEPTOR_CLASSES = "spring.kafka.producer.properties.interceptor.classes";
@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
try {
String value = environment.getProperty(PROPERTY_KEY_INTERCEPTOR_CLASSES);
if (StrUtil.isEmpty(value)) {
value = TenantKafkaProducerInterceptor.class.getName();
} else {
value += "," + TenantKafkaProducerInterceptor.class.getName();
}
environment.getSystemProperties().put(PROPERTY_KEY_INTERCEPTOR_CLASSES, value);
} catch (NoClassDefFoundError ignore) {
}
}
}7. 最佳实践
7.1 消息命名规范
- 消息类名以
Message结尾 - 消息类名应该能够清晰描述消息的含义
- 示例:
OrderStatusChangeMessage、KafkaWebSocketMessage、DemoMessage
7.2 Topic 命名规范
- Topic 使用小写字母和连字符
- Topic 应该能够清晰描述消息的用途
- 示例:
order-status-change-topic、websocket-topic、demo-topic
7.3 消息数据设计
java
import lombok.Data;
@Data
public class OrderStatusChangeMessage {
private Long orderId;
private String orderNo;
private Integer oldStatus;
private Integer newStatus;
private Long userId;
private Long timestamp;
public OrderStatusChangeMessage setOrder(OrderDO order, Integer newStatus) {
this.orderId = order.getId();
this.orderNo = order.getOrderNo();
this.oldStatus = order.getStatus();
this.newStatus = newStatus;
this.userId = order.getUserId();
this.timestamp = System.currentTimeMillis();
return this;
}
}7.4 消息发送
java
import org.springframework.kafka.core.KafkaTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class OrderMessageProducer {
private final KafkaTemplate<Object, Object> kafkaTemplate;
private final String topic = "order-status-change-topic";
public OrderMessageProducer(KafkaTemplate<Object, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendOrderStatusChange(OrderDO order, Integer newStatus) {
try {
OrderStatusChangeMessage message = new OrderStatusChangeMessage()
.setOrder(order, newStatus);
kafkaTemplate.send(topic, message).get();
log.info("[sendOrderStatusChange][订单状态变更消息发送成功,订单号: {},新状态: {}]",
order.getOrderNo(), newStatus);
} catch (Exception ex) {
log.error("[sendOrderStatusChange][订单状态变更消息发送失败,订单号: {}]",
order.getOrderNo(), ex);
}
}
}7.5 消息监听
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderStatusChangeListener {
@KafkaListener(
topics = "order-status-change-topic",
groupId = "order-status-consumer-group"
)
public void onMessage(@Payload OrderStatusChangeMessage message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("[onMessage][接收到订单状态变更消息,订单号: {},新状态: {}]",
message.getOrderNo(), message.getNewStatus());
try {
processOrderStatusChange(message);
log.info("[onMessage][订单状态变更消息处理完成,订单号: {}]",
message.getOrderNo());
} catch (Exception ex) {
log.error("[onMessage][订单状态变更消息处理失败,订单号: {}]",
message.getOrderNo(), ex);
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
}
}7.6 消息幂等性
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderStatusChangeListener {
@KafkaListener(
topics = "order-status-change-topic",
groupId = "order-status-consumer-group"
)
public void onMessage(@Payload OrderStatusChangeMessage message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
String lockKey = "order:status:change:" + message.getOrderId();
RLock lock = redissonClient.getLock(lockKey);
try {
if (lock.tryLock()) {
processOrderStatusChange(message);
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
OrderDO order = orderMapper.selectById(message.getOrderId());
if (order.getStatus().equals(message.getNewStatus())) {
log.info("[processOrderStatusChange][订单状态已更新,跳过处理,订单号: {}]",
message.getOrderNo());
return;
}
}
}7.7 错误处理
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderStatusChangeListener {
@KafkaListener(
topics = "order-status-change-topic",
groupId = "order-status-consumer-group"
)
public void onMessage(@Payload OrderStatusChangeMessage message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
try {
processOrderStatusChange(message);
} catch (BusinessException ex) {
log.warn("[onMessage][业务异常,订单号: {},错误: {}]",
message.getOrderNo(), ex.getMessage());
} catch (Exception ex) {
log.error("[onMessage][系统异常,订单号: {}]",
message.getOrderNo(), ex);
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
}
}7.8 日志记录
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderStatusChangeListener {
@KafkaListener(
topics = "order-status-change-topic",
groupId = "order-status-consumer-group"
)
public void onMessage(@Payload OrderStatusChangeMessage message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
long startTime = System.currentTimeMillis();
log.info("[onMessage][开始处理订单状态变更消息,订单号: {},新状态: {}]",
message.getOrderNo(), message.getNewStatus());
try {
processOrderStatusChange(message);
long endTime = System.currentTimeMillis();
log.info("[onMessage][订单状态变更消息处理完成,订单号: {},耗时: {}ms]",
message.getOrderNo(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[onMessage][订单状态变更消息处理失败,订单号: {},耗时: {}ms]",
message.getOrderNo(), endTime - startTime, ex);
}
}
private void processOrderStatusChange(OrderStatusChangeMessage message) {
}
}8. 常见问题
8.1 消息不消费
问题:消息发送后监听器不消费
解决方案:
- 检查监听器类是否添加了
@Component注解 - 检查监听器是否在 Spring 容器的扫描路径下
- 检查 Kafka 连接是否正常
- 检查 Topic 和 Consumer Group 配置是否正确
- 检查偏移量配置
8.2 消息重复消费
问题:消息被重复消费
解决方案:
- 使用分布式锁防止重复消费
- 实现消息幂等性
- 检查消费者组配置
- 检查偏移量提交配置
8.3 消息丢失
问题:消息发送后丢失
解决方案:
- 检查消息持久化配置
- 检查消息发送是否成功
- 检查 Topic 配置
- 检查偏移量提交配置
8.4 消息消费失败
问题:消息消费失败后不重试
解决方案:
- 检查消息重试配置
- 检查错误日志
- 检查消费者组配置
- 检查偏移量提交配置
8.5 性能问题
问题:消息消费速度慢
解决方案:
- 增加消费者数量
- 优化消费逻辑,减少处理时间
- 增加分区数量
- 检查 Kafka 性能
9. 监控和调试
9.1 消息发送监控
java
import org.springframework.kafka.core.KafkaTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MonitoredMessageProducer {
private final KafkaTemplate<Object, Object> kafkaTemplate;
private final String topic = "demo-topic";
public MonitoredMessageProducer(KafkaTemplate<Object, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(DemoMessage message) {
long startTime = System.currentTimeMillis();
log.info("[sendMessage][开始发送消息,Topic: {},消息类型: {}]",
topic, message.getMessageType());
try {
kafkaTemplate.send(topic, message).get();
long endTime = System.currentTimeMillis();
log.info("[sendMessage][消息发送完成,Topic: {},消息类型: {},耗时: {}ms]",
topic, message.getMessageType(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[sendMessage][消息发送失败,Topic: {},消息类型: {},耗时: {}ms]",
topic, message.getMessageType(), endTime - startTime, ex);
}
}
}9.2 消息消费监控
java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MonitoredMessageConsumer {
@KafkaListener(
topics = "demo-topic",
groupId = "demo-consumer-group"
)
public void onMessage(@Payload DemoMessage message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
long startTime = System.currentTimeMillis();
log.info("[onMessage][开始处理消息,Topic: {},Partition: {},Offset: {},消息类型: {}]",
topic, partition, offset, message.getMessageType());
try {
processMessage(message);
long endTime = System.currentTimeMillis();
log.info("[onMessage][消息处理完成,Topic: {},Partition: {},Offset: {},消息类型: {},耗时: {}ms]",
topic, partition, offset, message.getMessageType(), endTime - startTime);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[onMessage][消息处理失败,Topic: {},Partition: {},Offset: {},消息类型: {},耗时: {}ms]",
topic, partition, offset, message.getMessageType(), endTime - startTime, ex);
}
}
private void processMessage(DemoMessage message) {
}
}10. 注意事项
- Kafka 配置:确保 Kafka 地址、端口配置正确
- Topic 命名:使用小写字母和连字符
- Consumer Group:同一消费者组内的消费者分担消费任务
- 分区数量:根据业务场景选择合适的分区数量
- 消息持久化:确保 Topic 设置为持久化
- 偏移量提交:根据业务场景选择合适的偏移量提交策略
- 消息幂等性:确保消息可以安全地重复消费
- 分布式锁:在分布式环境中使用分布式锁防止重复消费
- 错误处理:正确处理消息消费过程中的异常
- 日志记录:记录消息的发送和消费情况,便于问题排查
- 性能优化:对于大数据量的处理,增加消费者数量和分区数量
- 监控告警:配置消息发送和消费的监控告警
- 多租户支持:使用 Producer 拦截器实现租户信息的传递
- 消息重试:合理配置消息重试次数和重试间隔
11. 与其他消息队列的对比
| 特性 | Kafka | Redis 消息队列 | 内存消息队列 | RabbitMQ | RocketMQ |
|---|---|---|---|---|---|
| 适用场景 | 分布式 | 单机/集群 | 单机应用 | 分布式 | 分布式 |
| 执行方式 | 异步 | 异步 | 同步 | 异步 | 异步 |
| 复杂度 | 高 | 中 | 低 | 中 | 高 |
| 性能 | 高 | 高 | 高 | 中 | 高 |
| 可靠性 | 高 | 中 | 低 | 高 | 高 |
| 持久化 | 支持 | Stream 支持 | 不支持 | 支持 | 支持 |
| 消息确认 | 支持 | Stream 支持 | 不支持 | 支持 | 支持 |
| 消息重试 | 支持 | 支持 | 不支持 | 支持 | 支持 |
| 广播消费 | 支持 | Pub/Sub 支持 | 支持 | 支持 | 支持 |
| 集群消费 | 支持 | Stream 支持 | 不支持 | 支持 | 支持 |
| 消息回溯 | 支持 | 支持 | 不支持 | 支持 | 支持 |
| 吞吐量 | 百万级 TPS | 万级 TPS | 万级 TPS | 万级 TPS | 十万级 TPS |
| 分区支持 | 支持 | 不支持 | 不支持 | 不支持 | 支持 |
