如何有效解决 Kafka 消息处理中的丢失、重复及顺序性问题?

2026-05-19 19:081阅读0评论SEO资源
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1508个文字,预计阅读时间需要7分钟。

如何有效解决 Kafka 消息处理中的丢失、重复及顺序性问题?

关于Kafka消息丢失、重复消费和顺序消费的问题,这些问题是我们在使用MQ时常常不得不面对的问题。以下结合我实际业务场景来与你分享一些解决方案:

1. 消息丢失:可以通过以下方式解决: - 确保生产者和消费者都配置了足够的`acks`设置,例如设置为all,以确保所有副本都收到消息后才会认为消息成功写入。 - 开启Kafka的副本机制,提高消息的持久性。

2. 消息重复消费: - 消费者端可以使用`offsets committed`机制,确保即使在重启后也能从上次提交的位置开始消费。 - 也可以在消费者端实现幂等逻辑,即即使消费了相同的消息多次,也不会影响业务。

3. 消息顺序消费: - 可以通过将消息路由到同一个分区来实现顺序消费,确保顺序性。 - 也可以通过设置Kafka的`max.inflight.records`参数来减少并发消费的记录数,从而保证消息的顺序。

希望这些建议能对您有所帮助。

关于 Kafka 消息丢失、重复消费和顺序消费的问题

消息丢失,消息重复消费,消息顺序消费等问题是我们使用 MQ 时不得不考虑的一个问题,下面我结合实际的业务来和你分享一下解决方案。

消息丢失问题

比如我们使用 Kakfa 时,以下场景都会发生消息丢失:

  • producer -> broker (生产者生产消息)
  • broker -> broker (集群环境,broker 同步给其他 broker)
  • broker -> consumer (消费者消费消息)

解决方案也很简单,设置 acks(消息确认机制)retries(重试机制)factor(设置 partition 数量)...

一般来说,最常见的消息丢失场景就是:consumer 消费消息

要保证 consumer 消费消息时不丢失消息,必须使用手动提交 ack

我们业务是这样实现的:

如何有效解决 Kafka 消息处理中的丢失、重复及顺序性问题?

  1. Kafka 拉取消息(一次批量拉取 100条)
  2. 为每条消息分配一个 msgId(递增)
  3. msgId 存入内存队列(sortSet)
  4. 使用 Map 存储 msgIdmsg (包含 offset)的映射关系
  5. 当业务处理完消息后,获取当前消息的 msgId,然后从 sortSet 中删除该 msgId(表示该消息已经处理过了)
  6. ack 时,如果当前 msgId <= sortSet(msgId 在 sortSet 中是从小到大排列) ,就提交当前 offset
  7. 就算 consumer 在处理消息时挂了,下次重启时就会从 sortSet 队首的消息开始拉取,实现至少处理一次语义。
  8. 步骤 7 存在一个问题:当消息处理完后,还没从 sortSet 中删除该 msgId,系统就挂了,当系统重启时,又会重新处理一次刚刚已处理过的消息,这就引出消息重复消费的问题了。
消息重复消费

要解决消息重复消费,也就是要实现幂等(幂等就是:多次请求,但结果保持不变,举一个例子你就明白了:在 http 中,你发送同一个 get 请求,无论发送多少次,返回结果都是一样的

回到我们的业务场景上,我以处理订单消息为例:

  • 幂等Key 由我们的订单Id + 订单状态组成(一笔订单的状态只会处理一次)

  • 在处理之前,我们首先会去 Redis 查询是否存在这个 Key

    如果存在,说明我们已经处理过了,直接丢掉;

    ​ 如果不存在,说明没处理过,继续往下处理;

  • 最终的逻辑是:将处理过的数据存到DB上,再把 幂等Key 存到 Redis

显然一般场景下 Redis 是无法保证幂等的

所以Redis只是一个前置处理,最终的幂等性依赖 DB唯一Key(订单Id+订单状态)

总的来说就是:通过 Redis 做前置处理,DB 唯一索引做最终保证实现幂等性

消息顺序消费

消息的顺序性很好理解,还是以订单处理为例

订单的状态有:支付、确认收货、完成等等,而订单下还有计费、退款的消息报

理论上来说:支付的消息肯定要比退款的消息先到。

但是程序处理的过程就不一定了,所以我们处理消息顺序消费的流程如下:

  • 宽表:创建一张宽表,唯一索引是 订单Id,将订单的每个状态拆分为一个列,当消息来了,只更新对应的字段就好,消息只会存在短暂的状态不一致问题,但是最终状态是一致的
  • 消息补偿机制
  • 把相同的 userID/orderId 发送到相同的 partition(因为一个 consumer 消费一个 partition)

本文共计1508个文字,预计阅读时间需要7分钟。

如何有效解决 Kafka 消息处理中的丢失、重复及顺序性问题?

关于Kafka消息丢失、重复消费和顺序消费的问题,这些问题是我们在使用MQ时常常不得不面对的问题。以下结合我实际业务场景来与你分享一些解决方案:

1. 消息丢失:可以通过以下方式解决: - 确保生产者和消费者都配置了足够的`acks`设置,例如设置为all,以确保所有副本都收到消息后才会认为消息成功写入。 - 开启Kafka的副本机制,提高消息的持久性。

2. 消息重复消费: - 消费者端可以使用`offsets committed`机制,确保即使在重启后也能从上次提交的位置开始消费。 - 也可以在消费者端实现幂等逻辑,即即使消费了相同的消息多次,也不会影响业务。

3. 消息顺序消费: - 可以通过将消息路由到同一个分区来实现顺序消费,确保顺序性。 - 也可以通过设置Kafka的`max.inflight.records`参数来减少并发消费的记录数,从而保证消息的顺序。

希望这些建议能对您有所帮助。

关于 Kafka 消息丢失、重复消费和顺序消费的问题

消息丢失,消息重复消费,消息顺序消费等问题是我们使用 MQ 时不得不考虑的一个问题,下面我结合实际的业务来和你分享一下解决方案。

消息丢失问题

比如我们使用 Kakfa 时,以下场景都会发生消息丢失:

  • producer -> broker (生产者生产消息)
  • broker -> broker (集群环境,broker 同步给其他 broker)
  • broker -> consumer (消费者消费消息)

解决方案也很简单,设置 acks(消息确认机制)retries(重试机制)factor(设置 partition 数量)...

一般来说,最常见的消息丢失场景就是:consumer 消费消息

要保证 consumer 消费消息时不丢失消息,必须使用手动提交 ack

我们业务是这样实现的:

如何有效解决 Kafka 消息处理中的丢失、重复及顺序性问题?

  1. Kafka 拉取消息(一次批量拉取 100条)
  2. 为每条消息分配一个 msgId(递增)
  3. msgId 存入内存队列(sortSet)
  4. 使用 Map 存储 msgIdmsg (包含 offset)的映射关系
  5. 当业务处理完消息后,获取当前消息的 msgId,然后从 sortSet 中删除该 msgId(表示该消息已经处理过了)
  6. ack 时,如果当前 msgId <= sortSet(msgId 在 sortSet 中是从小到大排列) ,就提交当前 offset
  7. 就算 consumer 在处理消息时挂了,下次重启时就会从 sortSet 队首的消息开始拉取,实现至少处理一次语义。
  8. 步骤 7 存在一个问题:当消息处理完后,还没从 sortSet 中删除该 msgId,系统就挂了,当系统重启时,又会重新处理一次刚刚已处理过的消息,这就引出消息重复消费的问题了。
消息重复消费

要解决消息重复消费,也就是要实现幂等(幂等就是:多次请求,但结果保持不变,举一个例子你就明白了:在 http 中,你发送同一个 get 请求,无论发送多少次,返回结果都是一样的

回到我们的业务场景上,我以处理订单消息为例:

  • 幂等Key 由我们的订单Id + 订单状态组成(一笔订单的状态只会处理一次)

  • 在处理之前,我们首先会去 Redis 查询是否存在这个 Key

    如果存在,说明我们已经处理过了,直接丢掉;

    ​ 如果不存在,说明没处理过,继续往下处理;

  • 最终的逻辑是:将处理过的数据存到DB上,再把 幂等Key 存到 Redis

显然一般场景下 Redis 是无法保证幂等的

所以Redis只是一个前置处理,最终的幂等性依赖 DB唯一Key(订单Id+订单状态)

总的来说就是:通过 Redis 做前置处理,DB 唯一索引做最终保证实现幂等性

消息顺序消费

消息的顺序性很好理解,还是以订单处理为例

订单的状态有:支付、确认收货、完成等等,而订单下还有计费、退款的消息报

理论上来说:支付的消息肯定要比退款的消息先到。

但是程序处理的过程就不一定了,所以我们处理消息顺序消费的流程如下:

  • 宽表:创建一张宽表,唯一索引是 订单Id,将订单的每个状态拆分为一个列,当消息来了,只更新对应的字段就好,消息只会存在短暂的状态不一致问题,但是最终状态是一致的
  • 消息补偿机制
  • 把相同的 userID/orderId 发送到相同的 partition(因为一个 consumer 消费一个 partition)