首页 > 编程知识 正文

kafka修改偏移量offset,kafka如何重置偏移量

时间:2023-05-05 20:07:44 阅读:16788 作者:3238

手动提交偏移量的原因最近我读了很多文章,为了保证消息的安全消费,为了避免消息丢失和消息的重复读取,建议向消费者客户端手动提交偏移量。 具体为以下:

1 .设置为自动提交时,kafka消费者阅读消息后,加入消费者端处理业务错误,但偏移量提交到kafka服务器端时,该消息将无法处理。 这在MQ中相当于消息的丢失。

2 .如果设置为自动提交,则默认情况下Meg每5秒提交一次偏移。 如果在3秒钟内发生分区重新平衡,则不会提交偏移,并且其他消费者获得此分区时会发生消息重复消耗。

听着,确实有这样的问题,所以最好设定为手动提交偏移量。

但是仔细想想,我觉得上面的说法有问题。 我还是浅学者,可能还没有充分理解其中的道理,下面写下我的想法。 另外,请在舒适的时间告诉我。

个人观点1 .对于上述第一种情况,业务在处理消息时报告错误,但偏移量已经提交,无法读取该数据。 这相当于信息的丢失。 这句话本身没有问题。 但是,设定为手动提交又如何呢? 我们在使用消费者连接kafka时,建立了很长的连接。 如果我们在处理其中一条消息时发生异常,可以控制不提交该偏移。 但是,这个消费者不能因为这个业务数据的处理失败就切断与kafka的连接吧。 会继续收到kafka的消息吧。 当其接收到下一消息时,处理将成功,并且我们将提供下一消息的偏移。 这样的话,不是可以弥补以前没有提交的偏移吗? 这不还是相当于信息丢失了吗?

本人认为的处理方式是将处理失败的消息的偏移量存储在数据库中,并分别读出这些偏移量的数据进行处理。 这么说来,设置手动提交和自动提交也是一样的。 所以,很难理解通过手动提交来避免消息丢失是什么原理。

2 .对于上述第二种情况,如果分区平衡,自动提交将按小时进行,不会提交,可以重新读取数据,但这句话也没有问题。 本人还专门做了实验,但是kafka确实不会在分区重新平衡发生时等待客户端提交偏移量。 如果客户端未提交旧分区的偏移量,则在发生分区重新平衡后,将没有机会提交旧分区的偏移量。

但是,手动提交偏移不知道何时会发生分区重新平衡。 如果在调用手动提交偏移的方法之前发生重新平衡,是否要提交偏移?

kafka还提供了分区重新平衡侦听器,允许消费者在侦听器中提交各自的偏移量。 所以,无论是设置为手动提交还是自动提交,只要定义分区平衡监听程序,不是就能保证分区前的偏移提交吗?

因此,综合上述说明,我个人认为通过将kafka设置为自动提交偏移,并将处理失败的偏移存储在数据库中进行单独处理,可以避免消息丢失,从而定义分区重新平衡侦听器这样理解,不知道有没有什么问题,请向大神指出。

额外的收获花了整个下午,去尝试了kafka的机制。 最终做了一件一团糟的事。 虽然装作不知道的样子,但也发现了其中的一些问题。 分享。

kafka消费者默认一次提取500条数据。 在我的实验中,一次发送2000条消息给kafka。 消息的内容是自然数的增加。

因为broker的主题只有一个分区,所以测试很简单。

在消费者方面,设定为手动提交,批量处理一次获取500件数据,处理完成后,一次提交偏移量。 在消费侧的逻辑中,每当消息中包含5时就判断为抛出异常。 抛出异常后,将不再提交偏移。 因为每个批次的500个数据中有5个,所以无法提交每个批次的偏移量。

服务开始后,没有提交每个批次中消息的最后偏移。 但是,这个消费者可以准确地按批次提取数据。 在获取kafka服务器端的这2000条数据后,实时向broker发送数据,该消费者也能实时根据偏移量正确读取数据。 但是,由于此时没有提交读取数据的偏移量,因此如果发生分区重新平衡,新的消费者将重新抽取数据。

猜想:

kafka消费者客户端自己也有偏移量吗? 此偏移将实时更新。 因此,每次提取数据时,都会从本地存储的偏移后面提取数据。 但是,由于没有向服务端提交本地偏移,所以新的消费者读取此分区时,首先会从服务端获取此分区的偏移并本地保存,从而引起数据的重复读取。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。