Kafka消息队列中,如何设置Consumer消费者配置?

2026-06-10 06:122阅读0评论SEO问题
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1085个文字,预计阅读时间需要5分钟。

Kafka消息队列中,如何设置Consumer消费者配置?

1、为什么会有消费者群?单个消费者消费能力有限,消费能力可能随着生产消息速度的加快而受限。通过指定消费者群,并不断向消费者群中增加消费者,可以增强消费处理能力。

1、为什么会有消费者组?

单个消费者消费能力有限,消费能力可能跟不上生产消息的速度。通过指定消费者组,并不断往消费者组里增加消费者,增强消费处理能力。当然,若消费能力过强,而生产消费速度太慢,可以减少消费者组中消费者数量,避免消费者空转。综上消费者组能够实现消费能力的动态伸缩。

由上图可知以下两点:

第一:topic中的一个partition只能由同一个消费者组中的一个消费者消费

由此可见,消费者组增加或减少消费者之后,消费者

第二:一个topic能够被多个消费者组消费,且消费者组之间互不干扰。


2、什么是再均衡(Rebalance)

再均衡(Rebalance):消费者组增加或减少消费者之后,主题中的分区会重新分配给不同的消费者。

Rebalance能够调整topic中分区与消费者的对应关系,当某个消费者宕机或者新增加消费者中,Rebalance能够动态调整分区与消费者的对应关系,给消费者组提供高可用性与伸缩性。

Rebalance带了高可用性与伸缩性的优点,那缺点是什么?

JVM中有个stop the world的概念,就是在进行垃圾回收时,所有线程会停止工作。Rebalance与stop the world类似,即再平衡期间,所有的消费者都会停止消费。

3、消费者如何确定要消费哪条消息?


1、 什么是偏移量offset?

偏移量offset:表示消息在partition中的偏移量,记录它当前消费到了分区的哪个位置上,也是代表该消息的唯一序号。

2、偏移量保存在哪里?

偏移量存储在​​​​_consumer_offset​​​​这个topic中,由消费者将偏移量以消息的方式发送到​​​_consumer_offset​​​中。那么当发生再平衡(Rebalance)时,新增加的消费者能够知道应该消费分区里的哪条消息。

2、如何提交偏移量?

自动提交

将​​enable.auto.commit​​被设置为true,那么消费者每隔5s会自动把从 poll() 方法轮询到的最大偏移量提交上去。

这样会存在一个问题:消息重复消费。

比如,此时是8点整,刚提交完最大偏移量,5秒后要重新提交一次偏移量,但消费者在消费3秒后就宕机了。当Rebalance后,新的消费者会获取8点整提交的偏移量。所以宕机的消费者消费的3秒数据,新的消费者也会重新消费。

同步提交

同步提交(​​​commitSync()​​):需要将​​​​enable.auto.commit​​​被设置为false,然后调用​​​commitSync()​​。它会直到偏移量被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。

Kafka消息队列中,如何设置Consumer消费者配置?

同步的问题:需要等待偏移量被成功提交后才可以返回,而且失败后会进行重试,可能因为一些网络原因而阻塞进程,影响整个应用的TPS。

​异步提交

异步提交(​​commitAsync()​​)​:也需要将​​​​​​enable.auto.commit​​​​​​被设置为false,然后调用​​​​​​commitAsync()​​。

异步的问题:异步提交不会重试,可能出现offset不是最新值,而发生消息重复消费。

同步和异步组合提交

一般情况下,执行异步的过程中,偶尔失败了,但下次再提交基本都会成功。但如果是在关闭消费者或再均衡前的最后一次提交,就要确保提交成功。

​需要将​​​commitAsync()​​​和​​​​​commitSync()​​​​进行组合使用。对异步的​​commitAsync()​​进行try catch捕获,一旦发生异常再执行同步的​​​​​​commitSync()​​​​。

本文共计1085个文字,预计阅读时间需要5分钟。

Kafka消息队列中,如何设置Consumer消费者配置?

1、为什么会有消费者群?单个消费者消费能力有限,消费能力可能随着生产消息速度的加快而受限。通过指定消费者群,并不断向消费者群中增加消费者,可以增强消费处理能力。

1、为什么会有消费者组?

单个消费者消费能力有限,消费能力可能跟不上生产消息的速度。通过指定消费者组,并不断往消费者组里增加消费者,增强消费处理能力。当然,若消费能力过强,而生产消费速度太慢,可以减少消费者组中消费者数量,避免消费者空转。综上消费者组能够实现消费能力的动态伸缩。

由上图可知以下两点:

第一:topic中的一个partition只能由同一个消费者组中的一个消费者消费

由此可见,消费者组增加或减少消费者之后,消费者

第二:一个topic能够被多个消费者组消费,且消费者组之间互不干扰。


2、什么是再均衡(Rebalance)

再均衡(Rebalance):消费者组增加或减少消费者之后,主题中的分区会重新分配给不同的消费者。

Rebalance能够调整topic中分区与消费者的对应关系,当某个消费者宕机或者新增加消费者中,Rebalance能够动态调整分区与消费者的对应关系,给消费者组提供高可用性与伸缩性。

Rebalance带了高可用性与伸缩性的优点,那缺点是什么?

JVM中有个stop the world的概念,就是在进行垃圾回收时,所有线程会停止工作。Rebalance与stop the world类似,即再平衡期间,所有的消费者都会停止消费。

3、消费者如何确定要消费哪条消息?


1、 什么是偏移量offset?

偏移量offset:表示消息在partition中的偏移量,记录它当前消费到了分区的哪个位置上,也是代表该消息的唯一序号。

2、偏移量保存在哪里?

偏移量存储在​​​​_consumer_offset​​​​这个topic中,由消费者将偏移量以消息的方式发送到​​​_consumer_offset​​​中。那么当发生再平衡(Rebalance)时,新增加的消费者能够知道应该消费分区里的哪条消息。

2、如何提交偏移量?

自动提交

将​​enable.auto.commit​​被设置为true,那么消费者每隔5s会自动把从 poll() 方法轮询到的最大偏移量提交上去。

这样会存在一个问题:消息重复消费。

比如,此时是8点整,刚提交完最大偏移量,5秒后要重新提交一次偏移量,但消费者在消费3秒后就宕机了。当Rebalance后,新的消费者会获取8点整提交的偏移量。所以宕机的消费者消费的3秒数据,新的消费者也会重新消费。

同步提交

同步提交(​​​commitSync()​​):需要将​​​​enable.auto.commit​​​被设置为false,然后调用​​​commitSync()​​。它会直到偏移量被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。

Kafka消息队列中,如何设置Consumer消费者配置?

同步的问题:需要等待偏移量被成功提交后才可以返回,而且失败后会进行重试,可能因为一些网络原因而阻塞进程,影响整个应用的TPS。

​异步提交

异步提交(​​commitAsync()​​)​:也需要将​​​​​​enable.auto.commit​​​​​​被设置为false,然后调用​​​​​​commitAsync()​​。

异步的问题:异步提交不会重试,可能出现offset不是最新值,而发生消息重复消费。

同步和异步组合提交

一般情况下,执行异步的过程中,偶尔失败了,但下次再提交基本都会成功。但如果是在关闭消费者或再均衡前的最后一次提交,就要确保提交成功。

​需要将​​​commitAsync()​​​和​​​​​commitSync()​​​​进行组合使用。对异步的​​commitAsync()​​进行try catch捕获,一旦发生异常再执行同步的​​​​​​commitSync()​​​​。