HelloCoder HelloCoder
首页
《Java小白求职之路》
《小白学Java》
计算机毕设
  • 一些免费计算机资源
  • 脚手架工具
  • 《从0到1学习Java多线程》
  • 《从0到1搭建服务器》
  • 《可观测和监控》
  • 《k8s学习心得》
随笔
关于作者
首页
《Java小白求职之路》
《小白学Java》
计算机毕设
  • 一些免费计算机资源
  • 脚手架工具
  • 《从0到1学习Java多线程》
  • 《从0到1搭建服务器》
  • 《可观测和监控》
  • 《k8s学习心得》
随笔
关于作者
  • 《LearnJavaToFindAJob》

    • 导读

    • 【初级】6~12k档

    • 【中级】12k-26k档

      • JVM进阶

      • Java进阶

      • MySQL

      • 中间件

        • Redis

          • Redis为什么删除数据后,内存占用依然很高?
          • Redis为什么要使用单线程,新版本为什么引入多线程?
          • Redis为什么要把字符串设计成SDS?
          • Redis主从、哨兵、集群的区别
          • Redis主从同步的原理
          • Redis之缓存击穿、穿透、雪崩
          • Redis分布式事务锁的原理
          • Redis的使用规范有哪些?
          • Redis的删除策略和内存淘汰机制
          • Redis的持久化机制,RDB和AOF
          • Redis的监控指标有哪些?
          • 一致性hash算法
        • Docker面试题
        • Dubbo面试题
        • Kafka如何避免消失丢失
        • Netty面试题
        • Nginx面试题
        • Tomcat面试题
        • Zookeeper面试题
        • elasticsearch面试题
        • k8s面试题
        • 消息队列面试题
      • 算法

      • 高阶

    • 【高级】26k+档

    • 大厂面试题

    • 求职建议

    • 面经

  • LearnJavaToFindAJob
  • 【中级】12k-26k档
  • 中间件
#Kafka
HaC
2026-06-23
目录

Kafka如何避免消失丢失

# 一、消息丢失的几个典型场景

我们先定位消息可能丢在哪,再来谈如何解决。

阶段 丢失场景 根因
生产者 → Broker 发送成功但实际未落盘 acks=0 或 acks=1,Leader 收到但未同步给 Follower 就挂了
Broker 端 消息落盘后丢失 磁盘损坏、日志段删除策略误删、副本未同步
Broker → 消费者 消费者收到但未处理完就提交 Offset enable.auto.commit=true,消费逻辑异常但 Offset 已提交
消费者端 处理完但 Offset 提交失败 业务成功但 commitSync 抛出异常,重启后重复消费(注:这属于重复消费而非丢失,但常被误认为丢失)

我们最需要关注的是 **“生产者丢失”**和 “消费者丢失”,这两者占生产事故的 90% 以上。

# 二、如何设计一个防消息丢失的项目?

# 1. 生产端:保证消息“一定到达”

核心要点:等 Broker 确认落盘再返回成功,并且要有重试机制。

@Configuration
public class KafkaProducerConfig {
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        
        // ✅ 要求所有 ISR 副本确认写入
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        
        // ✅ 设置重试次数(网络抖动时自动重发)
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        
        // ✅ 幂等生产者(避免网络重试导致消息重复)
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        // ✅ 设置最大的消息大小(防止大消息被 Broker 拒绝)
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
        
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
    }
}

发送时的兜底策略:

@Service
public class OrderService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void createOrder(Order order) {
        // 1. 先存本地业务表
        orderDao.insert(order);
        
        // 2. 再存一条“待发送消息”表(本地消息表)
        MessageRecord record = new MessageRecord();
        record.setOrderId(order.getId());
        record.setTopic("order-topic");
        record.setPayload(JSON.toJSONString(order));
        record.setStatus("PENDING");  // 待发送
        messageRecordDao.insert(record);
        
        // 3. 发送消息
        try {
            kafkaTemplate.send("order-topic", order.getId(), JSON.toJSONString(order))
                .addCallback(
                    result -> {
                        // 发送成功 → 更新本地消息状态为 SENT
                        messageRecordDao.updateStatus(record.getId(), "SENT");
                    },
                    ex -> {
                        // 发送失败 → 记录失败次数,定时任务补偿重试
                        messageRecordDao.incrementRetryCount(record.getId());
                        log.error("消息发送失败", ex);
                    }
                );
        } catch (Exception e) {
            // 如果 send() 本身抛出异常,也记录失败,等定时任务补偿
            messageRecordDao.incrementRetryCount(record.getId());
            log.error("消息发送异常", e);
        }
    }
}

兜底的定时补偿任务:

@Component
public class MessageRetryScheduler {
    
    @Scheduled(fixedDelay = 30000)  // 每30秒执行一次
    @Transactional
    public void retryPendingMessages() {
        List<MessageRecord> pendingList = messageRecordDao.findByStatusAndRetryCountLessThan(
            "PENDING", 5
        );
        
        for (MessageRecord record : pendingList) {
            try {
                kafkaTemplate.send(record.getTopic(), record.getKey(), record.getPayload())
                    .get(10, TimeUnit.SECONDS);
                record.setStatus("SENT");
                messageRecordDao.update(record);
            } catch (Exception e) {
                record.setRetryCount(record.getRetryCount() + 1);
                messageRecordDao.update(record);
                
                if (record.getRetryCount() >= 5) {
                    // 超过重试次数 → 转人工处理(打入死信队列或告警)
                    alertService.sendAlert("消息发送超限", record);
                    record.setStatus("FAILED");
                    messageRecordDao.update(record);
                }
            }
        }
    }
}

一句话总结生产端:先存本地消息表,再发 MQ,用定时任务兜底补偿。


# 2. 服务端(Broker):配置防丢参数

properties

# 保证消息不丢的关键 Broker 配置
# 1. 最小 ISR 副本数(必须至少有 2 个副本同步才算写入成功)
min.insync.replicas=2

# 2. 刷盘策略(同步刷盘,虽然性能稍差但最安全)
flush.messages=1

# 3. 日志保留策略(不要过早删除)
log.retention.hours=72
log.retention.bytes=-1

创建 Topic 时指定副本数:

bash

kafka-topics --create --topic order-topic \
  --partitions 3 --replication-factor 3 \
  --config min.insync.replicas=2

# 3. 消费端:手动提交 Offset,处理完再确认

@KafkaListener(topics = "order-topic", containerFactory = "manualListenerContainerFactory")
public void consumeOrder(ConsumerRecord<String, String> record, Acknowledgment ack) {
    try {
        // 1. 业务处理(幂等)
        Order order = JSON.parseObject(record.value(), Order.class);
        orderService.processOrder(order);
        
        // 2. ✅ 处理成功 → 手动提交 Offset
        ack.acknowledge();
        
    } catch (Exception e) {
        // 3. ❌ 处理失败 → 不提交 Offset
        // 注意:不要抛异常让 Listener 自己处理,否则会重试
        log.error("消费失败,等待下次拉取重试", e);
        
        // 4. 记录失败次数(建议写入重试表或死信队列)
        failureRecordService.save(record, e);
        
        // 5. 如果失败次数过多 → 转入死信 Topic 避免阻塞
        if (failureRecordService.countByMsgId(record.key()) > 3) {
            kafkaTemplate.send("order-topic-dlq", record.key(), record.value());
            // 手动提交 Offset 跳过这条坏消息
            ack.acknowledge();
        }
        
        // 注意:如果不提交 Offset,这条消息会被不断拉取重试
        // 直到成功或超过阈值。如果一直失败,会形成“活锁”阻塞后续消息。
    }
}

# 三、完整的防丢失架构图

graph TB
    subgraph "生产者端 3 层保障"
        A1[业务入库] --> A2[本地消息表 PENDING]
        A2 --> A3[发送Kafka]
        A3 -->|成功| A4[更新消息表 SENT]
        A3 -->|失败| A5[定时任务补偿重试]
        A5 -->|超过次数| A6[告警 + 人工介入]
    end
    
    subgraph "Broker端 2 层保障"
        B1[acks=all]
        B2[min.insync.replicas=2]
        B3[Topic 副本数=3]
    end
    
    subgraph "消费者端 3 层保障"
        C1[拉取消息] --> C2[业务处理<br>幂等设计]
        C2 -->|成功| C3[手动提交 Offset]
        C2 -->|失败| C4[记录失败 + 不提交]
        C4 -->|重试3次| C5[转入死信队列 DLQ]
        C5 --> C6[人工处理 + 告警]
    end
    
    A3 --> B1
    B1 --> C1

# 四、最终总结:防消息丢失的黄金法则

阶段 核心原则 具体做法
生产端 先存后发,补偿兜底 本地消息表 + acks=all + 重试 + 定时补偿
服务端 多重副本,同步刷盘 min.insync.replicas=2 + 副本数 3 + 同步刷盘
消费端 业务幂等,手动提交 enable.auto.commit=false + 处理完再提交 + 死信队列

# 五、关于本地消息表的思考

你可能会问:“先存本地消息表再发 MQ”会不会增加业务代码复杂度?

是的,会带来一些开发成本,但这是目前生产环境最稳妥的防丢失方案。

如果觉得维护消息表比较麻烦,可以考虑更轻量的替代方案:

  • 使用 Spring 的 @TransactionalEventListener + @Async,在事务提交后异步发送 MQ,但这只解决“事务提交后发消息”,不解决消息发送失败的补偿问题。
#Kafka
上次更新: 2026-06-23 17:08:57
最近更新
01
MySQL支持的锁有哪些
06-23
02
HTTP 是不保存状态的协议, 如何保存用户状态
06-23
03
WebSocket、短轮询、长轮询的区别
06-23
更多文章>
Theme by Vdoing | Copyright © 2020-2026 HaC
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式