Skip to content

消息队列(Kafka)

1. 概述

Kafka 是梵医云系统中使用的分布式消息队列中间件,基于 Apache Kafka 实现,具有高吞吐量、高可用、低延迟等特点。系统基于 spring-kafka 实现 Kafka 的集成。

Kafka 消息队列具有以下特点:

  • 高吞吐量:单机可达百万级 TPS
  • 高可用:支持集群部署,故障自动切换
  • 低延迟:毫秒级消息延迟
  • 消息可靠:支持消息持久化、消息重试、消息回溯
  • 分布式支持:适用于分布式环境
  • 多种消费模式:支持集群消费、广播消费

2. 技术架构

2.1 Kafka 架构

生产者 -> Kafka Broker -> Topic -> Partition 0 -> 消费者组1 -> 消费者1
                         -> Partition 1 -> 消费者组1 -> 消费者2
                         -> Partition 2 -> 消费者组2 -> 消费者3

核心组件

  • Producer:消息生产者,负责发送消息
  • Broker:Kafka 服务器,负责消息存储和转发
  • Topic:消息主题,消息的分类
  • Partition:分区,Topic 的物理分区,用于提高并发
  • Consumer Group:消费者组,同一组内的消费者分担消费任务
  • Consumer:消息消费者,负责消费消息
  • Offset:偏移量,消息在分区中的位置

2.2 消息模型

2.2.1 集群消费(Clustering)

Topic -> Consumer Group -> Consumer1
                      -> Consumer2
                      -> Consumer3

特点:

  • 同一消费者组内的消费者分担消费任务
  • 每条消息只会被组内一个消费者消费
  • 适用于负载均衡场景

2.2.2 广播消费(Broadcasting)

Topic -> Consumer Group1 -> Consumer1
      -> Consumer Group2 -> Consumer2
      -> Consumer Group3 -> Consumer3

特点:

  • 每个消费者组都会收到消息
  • 适用于广播通知场景

2.3 核心组件

组件说明位置
KafkaTemplateKafka 操作模板类spring-kafka
@KafkaListenerKafka 消息监听器注解spring-kafka
ProducerInterceptorProducer 拦截器Kafka
TenantKafkaProducerInterceptor多租户 Producer 拦截器fanyi-spring-boot-starter-biz-tenant
TenantKafkaEnvironmentPostProcessor多租户 Kafka 环境后处理器fanyi-spring-boot-starter-biz-tenant

3. 配置说明

3.1 Maven 依赖

pom.xml 中添加依赖:

xml
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

3.2 Kafka 配置

application.yaml 中配置 Kafka:

yaml
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 3
      properties:
        interceptor.classes: com.fanyi.cloud.framework.tenant.core.mq.kafka.TenantKafkaProducerInterceptor
    consumer:
      group-id: fanyi-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false
    listener:
      ack-mode: manual_immediate
      concurrency: 5

3.3 WebSocket Kafka 配置

application.yaml 中配置 WebSocket Kafka:

yaml
fanyi:
  websocket:
    sender-type: kafka
    sender-kafka:
      topic: websocket-topic
      consumer-group: websocket-consumer-group

3.4 配置参数说明

参数类型必填默认值说明
spring.kafka.bootstrap-serversString-Kafka 服务器地址,多个地址用逗号分隔
spring.kafka.producer.key-serializerString-Key 序列化器
spring.kafka.producer.value-serializerString-Value 序列化器
spring.kafka.producer.acksString1确认模式(0/1/all)
spring.kafka.producer.retriesInteger3发送失败重试次数
spring.kafka.consumer.group-idString-消费者组 ID
spring.kafka.consumer.key-deserializerString-Key 反序列化器
spring.kafka.consumer.value-deserializerString-Value 反序列化器
spring.kafka.consumer.auto-offset-resetStringlatest偏移量重置策略(earliest/latest/none)
spring.kafka.consumer.enable-auto-commitBooleantrue是否自动提交偏移量
spring.kafka.listener.ack-modeStringbatch确认模式
spring.kafka.listener.concurrencyInteger1消费者并发数

4. 使用方式

4.1 定义消息

定义消息实体类:

java
import lombok.Data;

@Data
public class DemoMessage {

    private Long id;
    private String messageType;
    private String content;
    private Long userId;
    private Long timestamp;
}

4.2 发送消息

使用 KafkaTemplate 发送消息:

java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class DemoMessageProducer {

    private final KafkaTemplate<Object, Object> kafkaTemplate;
    private final String topic = "demo-topic";

    public DemoMessageProducer(KafkaTemplate<Object, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendDemoMessage(String content) {
        DemoMessage message = new DemoMessage();
        message.setMessageType("DEMO_MESSAGE");
        message.setContent(content);
        message.setUserId(1L);
        message.setTimestamp(System.currentTimeMillis());
        kafkaTemplate.send(topic, message);
    }
}

4.3 监听消息

使用 @KafkaListener 注解监听消息:

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DemoMessageConsumer {

    @KafkaListener(
            topics = "demo-topic",
            groupId = "demo-consumer-group"
    )
    public void onMessage(@Payload DemoMessage message,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.OFFSET) long offset) {
        log.info("[onMessage][接收到消息,Topic: {},Partition: {},Offset: {},消息类型: {},内容: {}]",
                topic, partition, offset, message.getMessageType(), message.getContent());
    }
}

4.4 广播消息

使用不同的消费者组实现广播:

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DemoMessageConsumer {

    @KafkaListener(
            topics = "demo-topic",
            // 在 Group 上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Group 不同,以达到广播消费的目的
            groupId = "demo-consumer-group" + "-" + "#{T(java.util.UUID).randomUUID()}"
    )
    public void onMessage(@Payload DemoMessage message,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.OFFSET) long offset) {
        log.info("[onMessage][接收到消息,Topic: {},Partition: {},Offset: {},消息类型: {},内容: {}]",
                topic, partition, offset, message.getMessageType(), message.getContent());
    }
}

5. 业务场景示例

5.1 WebSocket 消息广播

5.1.1 WebSocket 消息定义

java
import lombok.Data;

@Data
public class KafkaWebSocketMessage {

    private String sessionId;
    private Integer userType;
    private Long userId;
    private String messageType;
    private String messageContent;
}

5.1.2 WebSocket 消息发送者

java
import com.fanyi.cloud.framework.websocket.core.sender.AbstractWebSocketMessageSender;
import com.fanyi.cloud.framework.websocket.core.sender.WebSocketMessageSender;
import com.fanyi.cloud.framework.websocket.core.session.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.concurrent.ExecutionException;

@Slf4j
public class KafkaWebSocketMessageSender extends AbstractWebSocketMessageSender {

    private final KafkaTemplate<Object, Object> kafkaTemplate;
    private final String topic;

    public KafkaWebSocketMessageSender(WebSocketSessionManager sessionManager,
                                       KafkaTemplate<Object, Object> kafkaTemplate,
                                       String topic) {
        super(sessionManager);
        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
    }

    @Override
    public void send(Integer userType, Long userId, String messageType, String messageContent) {
        sendKafkaMessage(null, userId, userType, messageType, messageContent);
    }

    @Override
    public void send(Integer userType, String messageType, String messageContent) {
        sendKafkaMessage(null, null, userType, messageType, messageContent);
    }

    @Override
    public void send(String sessionId, String messageType, String messageContent) {
        sendKafkaMessage(sessionId, null, null, messageType, messageContent);
    }

    private void sendKafkaMessage(String sessionId, Long userId, Integer userType,
                                  String messageType, String messageContent) {
        KafkaWebSocketMessage mqMessage = new KafkaWebSocketMessage()
                .setSessionId(sessionId)
                .setUserId(userId)
                .setUserType(userType)
                .setMessageType(messageType)
                .setMessageContent(messageContent);
        try {
            kafkaTemplate.send(topic, mqMessage).get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("[sendKafkaMessage][发送消息({}) 到 Kafka 失败]", mqMessage, e);
        }
    }
}

5.1.3 WebSocket 消息消费者

java
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;

@RequiredArgsConstructor
public class KafkaWebSocketMessageConsumer {

    private final KafkaWebSocketMessageSender kafkaWebSocketMessageSender;

    @KafkaListener(
            topics = "${fanyi.websocket.sender-kafka.topic}",
            // 在 Group 上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Group 不同,以达到广播消费的目的
            groupId = "${fanyi.websocket.sender-kafka.consumer-group}" + "-" + "#{T(java.util.UUID).randomUUID()}")
    public void onMessage(KafkaWebSocketMessage message) {
        kafkaWebSocketMessageSender.send(message.getSessionId(),
                message.getUserType(), message.getUserId(),
                message.getMessageType(), message.getMessageContent());
    }
}

5.2 订单状态变更通知

5.2.1 订单状态消息定义

java
import lombok.Data;

@Data
public class OrderStatusChangeMessage {

    private Long orderId;
    private String orderNo;
    private Integer oldStatus;
    private Integer newStatus;
    private Long userId;
    private Long timestamp;
}

5.2.2 订单状态消息发送

java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderService {

    private final KafkaTemplate<Object, Object> kafkaTemplate;
    private final String topic = "order-status-change-topic";

    public OrderService(KafkaTemplate<Object, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Transactional
    public void updateOrderStatus(Long orderId, Integer newStatus) {
        OrderDO order = orderMapper.selectById(orderId);
        Integer oldStatus = order.getStatus();
        order.setStatus(newStatus);
        orderMapper.updateById(order);

        OrderStatusChangeMessage message = new OrderStatusChangeMessage();
        message.setOrderId(orderId);
        message.setOrderNo(order.getOrderNo());
        message.setOldStatus(oldStatus);
        message.setNewStatus(newStatus);
        message.setUserId(order.getUserId());
        message.setTimestamp(System.currentTimeMillis());
        kafkaTemplate.send(topic, message);
    }
}

5.2.3 订单状态消息监听

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderStatusChangeListener {

    @KafkaListener(
            topics = "order-status-change-topic",
            groupId = "order-status-consumer-group"
    )
    public void onMessage(@Payload OrderStatusChangeMessage message,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.OFFSET) long offset) {
        log.info("[onMessage][订单状态变更,订单号: {},旧状态: {},新状态: {}]",
                message.getOrderNo(), message.getOldStatus(), message.getNewStatus());
        
        switch (message.getNewStatus()) {
            case 10:
                handleOrderPaid(message);
                break;
            case 20:
                handleOrderShipped(message);
                break;
            case 30:
                handleOrderCompleted(message);
                break;
        }
    }

    private void handleOrderPaid(OrderStatusChangeMessage message) {
        log.info("[handleOrderPaid][订单已支付,订单号: {}]", message.getOrderNo());
    }

    private void handleOrderShipped(OrderStatusChangeMessage message) {
        log.info("[handleOrderShipped][订单已发货,订单号: {}]", message.getOrderNo());
    }

    private void handleOrderCompleted(OrderStatusChangeMessage message) {
        log.info("[handleOrderCompleted][订单已完成,订单号: {}]", message.getOrderNo());
    }
}

6. 多租户支持

6.1 Producer 拦截器

实现 ProducerInterceptor 接口,在发送消息时添加租户信息:

java
import cn.hutool.core.util.ReflectUtil;
import com.fanyi.cloud.framework.tenant.core.context.TenantContextHolder;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;

import java.util.Map;

public class TenantKafkaProducerInterceptor implements ProducerInterceptor<Object, Object> {

    @Override
    public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
        Long tenantId = TenantContextHolder.getTenantId();
        if (tenantId != null) {
            Headers headers = (Headers) ReflectUtil.getFieldValue(record, "headers");
            headers.add("tenant-id", tenantId.toString().getBytes());
        }
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

6.2 环境后处理器

实现 EnvironmentPostProcessor 接口,在 Kafka 初始化时注册多租户拦截器:

java
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.core.env.ConfigurableEnvironment;

@Slf4j
public class TenantKafkaEnvironmentPostProcessor implements EnvironmentPostProcessor {

    private static final String PROPERTY_KEY_INTERCEPTOR_CLASSES = "spring.kafka.producer.properties.interceptor.classes";

    @Override
    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
        try {
            String value = environment.getProperty(PROPERTY_KEY_INTERCEPTOR_CLASSES);
            if (StrUtil.isEmpty(value)) {
                value = TenantKafkaProducerInterceptor.class.getName();
            } else {
                value += "," + TenantKafkaProducerInterceptor.class.getName();
            }
            environment.getSystemProperties().put(PROPERTY_KEY_INTERCEPTOR_CLASSES, value);
        } catch (NoClassDefFoundError ignore) {
        }
    }
}

7. 最佳实践

7.1 消息命名规范

  • 消息类名以 Message 结尾
  • 消息类名应该能够清晰描述消息的含义
  • 示例:OrderStatusChangeMessageKafkaWebSocketMessageDemoMessage

7.2 Topic 命名规范

  • Topic 使用小写字母和连字符
  • Topic 应该能够清晰描述消息的用途
  • 示例:order-status-change-topicwebsocket-topicdemo-topic

7.3 消息数据设计

java
import lombok.Data;

@Data
public class OrderStatusChangeMessage {

    private Long orderId;
    private String orderNo;
    private Integer oldStatus;
    private Integer newStatus;
    private Long userId;
    private Long timestamp;

    public OrderStatusChangeMessage setOrder(OrderDO order, Integer newStatus) {
        this.orderId = order.getId();
        this.orderNo = order.getOrderNo();
        this.oldStatus = order.getStatus();
        this.newStatus = newStatus;
        this.userId = order.getUserId();
        this.timestamp = System.currentTimeMillis();
        return this;
    }
}

7.4 消息发送

java
import org.springframework.kafka.core.KafkaTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class OrderMessageProducer {

    private final KafkaTemplate<Object, Object> kafkaTemplate;
    private final String topic = "order-status-change-topic";

    public OrderMessageProducer(KafkaTemplate<Object, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendOrderStatusChange(OrderDO order, Integer newStatus) {
        try {
            OrderStatusChangeMessage message = new OrderStatusChangeMessage()
                    .setOrder(order, newStatus);
            kafkaTemplate.send(topic, message).get();
            log.info("[sendOrderStatusChange][订单状态变更消息发送成功,订单号: {},新状态: {}]",
                    order.getOrderNo(), newStatus);
        } catch (Exception ex) {
            log.error("[sendOrderStatusChange][订单状态变更消息发送失败,订单号: {}]",
                    order.getOrderNo(), ex);
        }
    }
}

7.5 消息监听

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderStatusChangeListener {

    @KafkaListener(
            topics = "order-status-change-topic",
            groupId = "order-status-consumer-group"
    )
    public void onMessage(@Payload OrderStatusChangeMessage message,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.OFFSET) long offset) {
        log.info("[onMessage][接收到订单状态变更消息,订单号: {},新状态: {}]",
                message.getOrderNo(), message.getNewStatus());
        try {
            processOrderStatusChange(message);
            log.info("[onMessage][订单状态变更消息处理完成,订单号: {}]",
                    message.getOrderNo());
        } catch (Exception ex) {
            log.error("[onMessage][订单状态变更消息处理失败,订单号: {}]",
                    message.getOrderNo(), ex);
        }
    }

    private void processOrderStatusChange(OrderStatusChangeMessage message) {
    }
}

7.6 消息幂等性

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderStatusChangeListener {

    @KafkaListener(
            topics = "order-status-change-topic",
            groupId = "order-status-consumer-group"
    )
    public void onMessage(@Payload OrderStatusChangeMessage message,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.OFFSET) long offset) {
        String lockKey = "order:status:change:" + message.getOrderId();
        RLock lock = redissonClient.getLock(lockKey);
        try {
            if (lock.tryLock()) {
                processOrderStatusChange(message);
            }
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

    private void processOrderStatusChange(OrderStatusChangeMessage message) {
        OrderDO order = orderMapper.selectById(message.getOrderId());
        if (order.getStatus().equals(message.getNewStatus())) {
            log.info("[processOrderStatusChange][订单状态已更新,跳过处理,订单号: {}]",
                    message.getOrderNo());
            return;
        }
    }
}

7.7 错误处理

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderStatusChangeListener {

    @KafkaListener(
            topics = "order-status-change-topic",
            groupId = "order-status-consumer-group"
    )
    public void onMessage(@Payload OrderStatusChangeMessage message,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.OFFSET) long offset) {
        try {
            processOrderStatusChange(message);
        } catch (BusinessException ex) {
            log.warn("[onMessage][业务异常,订单号: {},错误: {}]",
                    message.getOrderNo(), ex.getMessage());
        } catch (Exception ex) {
            log.error("[onMessage][系统异常,订单号: {}]",
                    message.getOrderNo(), ex);
        }
    }

    private void processOrderStatusChange(OrderStatusChangeMessage message) {
    }
}

7.8 日志记录

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderStatusChangeListener {

    @KafkaListener(
            topics = "order-status-change-topic",
            groupId = "order-status-consumer-group"
    )
    public void onMessage(@Payload OrderStatusChangeMessage message,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.OFFSET) long offset) {
        long startTime = System.currentTimeMillis();
        log.info("[onMessage][开始处理订单状态变更消息,订单号: {},新状态: {}]",
                message.getOrderNo(), message.getNewStatus());
        try {
            processOrderStatusChange(message);
            long endTime = System.currentTimeMillis();
            log.info("[onMessage][订单状态变更消息处理完成,订单号: {},耗时: {}ms]",
                    message.getOrderNo(), endTime - startTime);
        } catch (Exception ex) {
            long endTime = System.currentTimeMillis();
            log.error("[onMessage][订单状态变更消息处理失败,订单号: {},耗时: {}ms]",
                    message.getOrderNo(), endTime - startTime, ex);
        }
    }

    private void processOrderStatusChange(OrderStatusChangeMessage message) {
    }
}

8. 常见问题

8.1 消息不消费

问题:消息发送后监听器不消费

解决方案

  1. 检查监听器类是否添加了 @Component 注解
  2. 检查监听器是否在 Spring 容器的扫描路径下
  3. 检查 Kafka 连接是否正常
  4. 检查 Topic 和 Consumer Group 配置是否正确
  5. 检查偏移量配置

8.2 消息重复消费

问题:消息被重复消费

解决方案

  1. 使用分布式锁防止重复消费
  2. 实现消息幂等性
  3. 检查消费者组配置
  4. 检查偏移量提交配置

8.3 消息丢失

问题:消息发送后丢失

解决方案

  1. 检查消息持久化配置
  2. 检查消息发送是否成功
  3. 检查 Topic 配置
  4. 检查偏移量提交配置

8.4 消息消费失败

问题:消息消费失败后不重试

解决方案

  1. 检查消息重试配置
  2. 检查错误日志
  3. 检查消费者组配置
  4. 检查偏移量提交配置

8.5 性能问题

问题:消息消费速度慢

解决方案

  1. 增加消费者数量
  2. 优化消费逻辑,减少处理时间
  3. 增加分区数量
  4. 检查 Kafka 性能

9. 监控和调试

9.1 消息发送监控

java
import org.springframework.kafka.core.KafkaTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class MonitoredMessageProducer {

    private final KafkaTemplate<Object, Object> kafkaTemplate;
    private final String topic = "demo-topic";

    public MonitoredMessageProducer(KafkaTemplate<Object, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(DemoMessage message) {
        long startTime = System.currentTimeMillis();
        log.info("[sendMessage][开始发送消息,Topic: {},消息类型: {}]",
                topic, message.getMessageType());
        try {
            kafkaTemplate.send(topic, message).get();
            long endTime = System.currentTimeMillis();
            log.info("[sendMessage][消息发送完成,Topic: {},消息类型: {},耗时: {}ms]",
                    topic, message.getMessageType(), endTime - startTime);
        } catch (Exception ex) {
            long endTime = System.currentTimeMillis();
            log.error("[sendMessage][消息发送失败,Topic: {},消息类型: {},耗时: {}ms]",
                    topic, message.getMessageType(), endTime - startTime, ex);
        }
    }
}

9.2 消息消费监控

java
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MonitoredMessageConsumer {

    @KafkaListener(
            topics = "demo-topic",
            groupId = "demo-consumer-group"
    )
    public void onMessage(@Payload DemoMessage message,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.OFFSET) long offset) {
        long startTime = System.currentTimeMillis();
        log.info("[onMessage][开始处理消息,Topic: {},Partition: {},Offset: {},消息类型: {}]",
                topic, partition, offset, message.getMessageType());
        try {
            processMessage(message);
            long endTime = System.currentTimeMillis();
            log.info("[onMessage][消息处理完成,Topic: {},Partition: {},Offset: {},消息类型: {},耗时: {}ms]",
                    topic, partition, offset, message.getMessageType(), endTime - startTime);
        } catch (Exception ex) {
            long endTime = System.currentTimeMillis();
            log.error("[onMessage][消息处理失败,Topic: {},Partition: {},Offset: {},消息类型: {},耗时: {}ms]",
                    topic, partition, offset, message.getMessageType(), endTime - startTime, ex);
        }
    }

    private void processMessage(DemoMessage message) {
    }
}

10. 注意事项

  1. Kafka 配置:确保 Kafka 地址、端口配置正确
  2. Topic 命名:使用小写字母和连字符
  3. Consumer Group:同一消费者组内的消费者分担消费任务
  4. 分区数量:根据业务场景选择合适的分区数量
  5. 消息持久化:确保 Topic 设置为持久化
  6. 偏移量提交:根据业务场景选择合适的偏移量提交策略
  7. 消息幂等性:确保消息可以安全地重复消费
  8. 分布式锁:在分布式环境中使用分布式锁防止重复消费
  9. 错误处理:正确处理消息消费过程中的异常
  10. 日志记录:记录消息的发送和消费情况,便于问题排查
  11. 性能优化:对于大数据量的处理,增加消费者数量和分区数量
  12. 监控告警:配置消息发送和消费的监控告警
  13. 多租户支持:使用 Producer 拦截器实现租户信息的传递
  14. 消息重试:合理配置消息重试次数和重试间隔

11. 与其他消息队列的对比

特性KafkaRedis 消息队列内存消息队列RabbitMQRocketMQ
适用场景分布式单机/集群单机应用分布式分布式
执行方式异步异步同步异步异步
复杂度
性能
可靠性
持久化支持Stream 支持不支持支持支持
消息确认支持Stream 支持不支持支持支持
消息重试支持支持不支持支持支持
广播消费支持Pub/Sub 支持支持支持支持
集群消费支持Stream 支持不支持支持支持
消息回溯支持支持不支持支持支持
吞吐量百万级 TPS万级 TPS万级 TPS万级 TPS十万级 TPS
分区支持支持不支持不支持不支持支持

12. 相关文档