首页 > 编程知识 正文

kafka修改分区数,kafka 参数设置

时间:2023-05-05 23:41:04 阅读:187384 作者:1718

我们Kafka中的message,既然是写在XXXXXX.log文件中,不管存了多少数据,只要超过1G大小,就又开启一个新的Log文件存message,文件内部分多个segment分段(可以配置)。

文件命名就是segment分段的起始位置命名,例如:

第一个segment分段:0000000000.log(包含message0~37)、第二个segment分段0000000038.log(包含message38~75)、第三个segment分段00000000076.log(包含message76~113)… 第N个segment分段0000000369.log (包含message N-37~N),偏移量可以想象是书签,消费者重启后,继续从那一次消费的起点开始读取,例如假如上一次消费者A的偏移量是0000000029,证明消费到了29,掉线了,或者停止服务重启后就从0000000030开始消费,于是通过二分查找,发现30的message还属于segment 1这个分段,于是进入segment 1 继续二分,直到找到message 30。

上例子: 老版本Kafka(0.9以前)都存在ZooKeeper当中,之后的新版本都存在Kafka自带的一个偏移量主题当中: 查看当前Kafka的所有topics:./kafka-topics.sh --list --zookeeper localhost:2181

可以看到确实有个偏移量的主题,文件的位置如果不做数据日志与系统日志分离,默认在Kafka内的logs,我的环境重新配置在了/kafka-logs文件夹中:

0~49,都是用来存消费者偏移量的主题文件。 我们自定义主题也会在这里出现: ./kafka-topics.sh --create --zookeeper localhost:2181 --topic jojo --partitions 2 --replication-factor 1


其中0和1就是分区号,为了更好的理解,进入我们自定义主题中查看:

就像上面所说的,为了防止00000000000.log文件过大导致数据定位速度慢,Kafka对partition分区基础上进行了segment分片,每个分片会对应.index和.log文件,这两个文件会以segment分段出来的第一条偏移量offset命名,例如第一个segment文件是从0000000000~0000000013568,那么第一个segment文件的命名就是:0000000000.log,第二个segment分段文件的范围是:0000000013568~000000004568,那么第二个segment分段文件的命名是:0000000013568.log。 而XXXXXXX.index的索引文件,存储的就是对应XXXXXX.log里面的偏移量位置。读取的时候,只需要读取到对应的起始位置,然后在起始位置加上偏移量,就可以顺利拿到对应位置的数据。 例如起始位置是2000,偏移量固定为1000,那么通过2000+1000 * N即可顺利拿到对应位置的N条数据。我们打开自己的数据文件看看:

简单总结,就是index存储文件,log存储数据 偏移量的大小都一样,也就是数据之间的间隔一样,所以可以快速定位(借用大神的一张图)

问题一:每次都会读取和更新偏移量吗?

不会的,因为那样子效率太低,offset会在Kafka进程的内存中保存一份,而不是每次去读取ZK或者__consumer-topic里面的,然后每次读取,定时拉取都会更改内存中的offset,然后定时再写回ZK或者Kafka里面,但是当忽然中断,这一段时间消费的offer set新的偏移量记录没来得及更新zk或者Kafka中的,下次消费还是会从旧的偏移量开始消费,也就是说,新的丢了,就拿旧的开始读。也会造成重复消费问题。

问题二:OffSet的更新时机?

通过配置enable.auto.commit属性可以设置是否自动提交offset
通过配置auto.commit.interval.ms:设置自动提交offset的时间间隔,例如3秒提交一次,还是5秒提交一次
通过代码:

consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(){......}})

具体代码可以百度+CV,这样手动提交的方式很麻烦,目前项目中没用。

问题三:偏移量的坑?

Kafka重复消费,就是消费端偏移量的锅!不管自动提交手动提交,以上方式都有问题,例如:
(1)消费者开启自动提交则是拿到数据后就自动提交最新消费的偏移量,但是不一定是消费逻辑成功消费。如果执行到一半断掉了,消费失败,就会造成数据丢失(生产者已经发出消息,消费者也修改了偏移量,但是消费失败,数据核对不上)
(2)如果是定时提交偏移量记录,例如五秒更新一次,当四秒的时候消费服务重启了,这个时候还是从旧的偏移量开始消费,造成了重复消费。
(3)手动提交就是关闭自动提交偏移量,消费成功后再提交更新,可以一定程度上防止脏数据和重复消费的问题(如果消费失败,那么久不更新嘛),但是非常麻烦,而且手动提交也会有问题。例如代码执行到异步提交那一块忽然中断,就没办法提交最新的偏移量。

具体哪种方式,都要根据具体业务需求来决定,如果是类似订单信息,扣减库存这种场景,一定一定要确保数据不超买超卖,就可以选择手动提交,如果是访问量统计,点击率这种可以允许一定程度的重复消费,那么自动提交消费偏移量也能满足。 问题四:为什么还有人使用Mysql来存储偏移量?

借助Mysql的事务来确保数据绝对的安全(更新偏移量时如果出现问题,做数据回滚的逻辑,这样能保证不会出现脏数据,下次消费直接从Mysql拿,又能确保不重复消费),但是不停查询和更新mysql效率非常低下,Mysql做主从复制高可用/分库分表成本也高。

问题五:如果关闭了自动提交也不手动提交,会发生什么?

很简单,每次消费者重启都会从头开始消费数据。跟我们创建一个消费端监听一个主题的时候指定:./kafka-console-consumer.sh --topic XXX–zookeeper localhost:2181 --from-beginning 效果一样,每次都是从头开始消费。

问题六:多消费组的偏移量怎么区分?

意思就是例如有topic 1,2,3 ;每个topic下面又有 consumerGroup A 、B、C、D…怎么区分不同topic下的每个消费组的偏移量。其实是类似Key-Value形式。Value就是消费偏移量这个无可非议。
Key 是:消费组consumerGroup+主题topic+分区partitions, 老版本都存在zk里面 ,由于我是新版本,这里借助老版本大神的一张截图,通过ZK 的get,拿到 offsets group的bigdata主题的02分区的offset信息

新版本记录在Kafka:

加入: exclude.internal.topics=false 问题七:既然新版本是一个topic,那么如何消费该偏移量的信息呢? 如果要消费该系统偏移量主题的消息,要修改config目录下的consumer.properties配置文件

./kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config /usr/hdp/3.1.4.0-315/kafka/config/consumer.properties --from-beginning

这样就可以监听‘消费者偏移量’主题的消费偏移量,类似,我生产我自己消费我自己的同时又生产自己。

我测试环境没数据因为没有发送和消费,看Kafka eagle就可以看到:

后续有继续深入学习的心得体会再补充~

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。