最近,在一个项目中遇到了kafka的current-offset丢失的问题。
问题是,服务程序暂停kafka的一个topic消费后,超过一天后,承运人发现此topic消费组的current-offset丢失,在服务程序继续消费此topic时
故障诊断结果表明,这是由kafka的配置项目offsets.retention.minutes引起的。 offsets.retention.minutes设置服务器保存消费者提交的offset的时间。 如果超过此时间没有提交offset,也就是说current-offset没有更新,服务器将删除当前保存的current-ofset。 如果current-offset被删除,消费者在启动后将无法从kafka获取offset,只能从最新(earliest )或最旧(oldest )的位置消费,具体取决于配置。
但是,我们在前期的开发测试时,也有过好几天只消费不提交offset的情况,但是kafka的offset没有删除。
后来我发现,这是因为在线服务使用的kafka版本,我们在自检中使用的kafka版本是2.21,而在线的kafka版本是0.11。
不同版本的kafka版本、kafka服务器端配置项offsets.retention.minutes或offset的过期时间取决于具体实现。
kafka版本2.21的测试结果显示,在消费者存在的时间内,无论多长时间不提交offset,kafka都不会清除offset。 服务器保存的group的offset在消费者停止后,在设定的offsets.retention.minutes时间后被删除。 从2.0.0版开始,offsets.retention.minutes的默认值为7天。
kafka 0.11版的测试结果显示,如果消费者在存在期间,超过了offsets.retention.minutes的时间,消费者没有提交offset,Kafka服务器就会删除该消费者的offset。
解决方法:
将当前正在消耗处理的数据的offset保存到数据库中,定时从数据库中同步offset。 设置offset提交的超时时间,如果在此时间内未提交offset,则自动再次提交上次提交的offset。 更改offsets.retention.minutes的大小,通常更改为7天即可。