为什么kafka性能非常高1 .使用零拷贝技术请参考linux操作系统内核附带的文章
2 .依次读写
kafka属于日志存储系统,实际上可以直接将消息添加到segment之后,并将大日志文件拆分为n个或更多个小的不同segment文件,从而提高访问时的效率设计原理。 segment逐步设计稀疏索引算法。 ((搜索) ) ) ) ) ) ) )。
不会为每个msg创建索引。 3 .分区模型体系结构
提高吞吐量动态消费者扩大和缩小消息顺序一致性的问题消息将在同一分区落地,最终被一个消费者消费。 kafka建议在同一组中,每个分区对应一个消费者。 分区为10时,为10个消费者。 Kafka消息key :请告诉MQ服务器端应该将该消息存储在其分区中。 因此,类似地,可以实现一种批量传递模型,其中key被分成相同的部分被同一消费者消费,且按顺序被消费4 .生产者传递消息
Kafka生产者首先将消息传递到Kafka缓冲区,以批处理格式压缩消息,然后传递到Kafka服务器端分区存储消息,减少带宽占用。 Kafka采用分段的segment文件方式,写入segment log文件。 此过程使用顺序读写和零拷贝技术减少与内核的切换次数。 消费者根据offset指定从中介获取消息,该消息不会立即从kafka中删除,而是采用日程删除策略。 5 .压缩消息
生产者首先将消息传递到缓冲器(Map集合key=topic名称value list集合)。
n条本地缓存消息) (压缩消息以在传递消息之前将其发送到kafka服务器端)----目的是
降低服务带宽的资源可以单独写入发送线程定时,也可以实时批量获取缓冲消息并将其发送到kafka服务器端。 高可用性kafka采用复制副本机制算法,可与其他broker同步存储一部分数据,以便在当前broker故障时,其他broker可以快速恢复复制副本数据。
复制概念实际上是在分区级别定义的,每个分区配置都有多个副本。 副本本质上是一个只能添加和写入消息的提交日志,根据kafka复制机制的定义,同一分区下的所有副本都存储相同的消息序列,这些副本分散存储在不同的Broker中
基本特性1 .高吞吐量、低延迟: kakfa的最大特点是消息收发非常快,kafka每秒可以处理十万条消息,其最低延迟只有几毫秒;
2 .高可伸缩性:每个主题(主题)包括多个分区,并且主题中的分区可以分布在不同的主机上;
3 .持久性、可靠性: Kafka可以允许数据持久化存储,消息持久化在磁盘上,支持数据备份以防止数据丢失。 Kafka的基础数据存储基于Zookeeper存储,Zookeeper知道该数据可以持续存储。
4 .容错性:群集中的节点失败,一个节点宕机,允许Kafka群集正常工作;
6 .高并发:支持数千个客户端同时读写。
Kafka如何在n个不同的segment文件中搜索指定的offset消息的每个分区? segment file的默认大小为500MB,在达到此大小的容量后开始创建新的segment file。 这样,1.7成的ConcurrentHashMap将一个较大的ConcurrentHashMap集合划分为多个较小的HashTable,每个segment有两个缓冲区:index索引和. log数据文件
保存. log消息文件
保存. index消息的索引,并定位. log文件中的消息
使用. timeIndex、时间索引文件和时间戳进行索引
那么segment file命名规则是怎样的呢
长度为20个字符,不能用0填充。 每个名字都从0开始。 以下segment file文件的名称是上一个segment file中最后一条消息的索引值。 index文件以key-value格式存储。 key表示在. log中按顺序消耗的offset值,value表示消息物理消息的存储位置。 但是, index并不记录所有消息,而是每隔几条消息记录一次,这样就不会占用太多内存。 即使消息不在索引记录中,也可以使用基于稀疏索引的二分钟查询来执行查询,从而大大缩小了范围。 这将提高消息查询速度
如何查看Kafka日志和index文件
Kafka-run-class.bat Kafka.tools.dumplogsegments---files 000000000000000.log
Kafka-run-class.bat Kafka.tools.dumplogsegments---files 0000000000000000.index
ACK参数0:Producer不等待中介的ACK。 这提供了最低的延迟。 Broker在收到数据后还未写入磁盘时返回,并且在Broker发生故障时可能会丢失数据。
1 :产品程序等待中介确认
,Partition 的 Leader 落盘成功后返回 ACK,如果在 Follower 同步成功之前 Leader 故障,那么将会丢失数据。-1(all):Producer 等待 Broker 的 ACK,Partition 的 Leader 和 Follower 全部落盘成功后才返回 ACK。如果生产者投递消息到Leader 节点之后,已经将数据同步给每个Follower 节点完成之后,但是没有及时的响应ack给生产者的情况下,有可能会造成生产者会重发。
推荐使用参数1
ISR的机制实现原理ISR分区副本数据集合,如果该副本数据能够与ISR能够建立连接的情况下,就会存放在该集合中,如果某个Follower 节点宕机之后,该副本数据就会从该ISR集合中剔除。如果Leader 节点宕机之后,就会从该ISR列表中选举一个新的Leader 节点。
副本选举实现原理当Leader 副本宕机之后,会从ISR同步副本列表中剔除,取剩下ISR列表中第一个为Leader 副本,显然有可能还有些副本数据没有及时同步完成,当选择为Leader副本之后有可能数据会丢失。
数据恢复机制Leo:该副本数据最后一个offset的位置,最大offset值。
Hw:高水位线 消费者能够消费最大的offset值
HW也就是所有副本中共同数据的最大offset值
如果follower2节点宕机的情况下,从isr集合列表中剔除,突然follower2又启动起来了,如何同步leader节点数据?
截取高于follower2 hw高水位线后面所有的offset消息,删除掉,在从该hw高水位位置开始同步leader节点数据。
如果leader副本节点数据宕机了,从isr集合列表中剔除,此时follower1副本会成功选举为leader副本,之后原来的leader
副本节点有启动起来了,在这时候做为follower副本同步新的leader副本数据会早成消息丢失
1.主题管理(创建、删除、增加分区)
指控制器帮助我们完成对 Kafka 主题的创建、删除以及分区增加的操作。换句话说,当我们执行kafka-topics 脚本时,大部分的后台工作都是控制器来完成的
2.分区重分配
分区重分配主要是指,kafka-reassign-partitions 脚本,提供的对已有主题分区进行细粒度的分配功能。这部分功能也是控制器实现的。
3.集群成员管理
自动检测新增 Broker、Broker 主动关闭及被动宕机。这种自动检测是依赖于前面提到的 Watch 功能和 ZooKeeper 临时节点组合实现的。比如,控制器组件会利用Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。
当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点。一旦创建完毕,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。
侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:临时节点。每个 Broker 启动后,会在 /brokers/ids 下创建一个临时 znode。当 Broker 宕机或主动关闭后,该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行“善后”。
4.数据服务
控制器的最后一大类工作,就是向其他 Broker 提供数据服务,控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
选举原理1.zookeeper底层核心:临时节点 当如果该会话连接关闭的时候,
该临时节点就会从zookeeper上被自动的删除,发送通知给其他订阅该
临时节点的客户端。临时节点特征:保证唯一不允许重复的,最终只会有一个Broker创建成功。有没有发现这和zookeeper实现分布式锁非常相似的。
2.多个不同的Broker创建相同的临时节点,谁能够创建成功 谁就是为Broker控制器(先到先得)创建临时节点失败的Broker,订阅该临时节点,如果当临时节点再宕机之后(即新的控制器角色),会发送事件通知给订阅了该节点的Broker重新开始竞争控制器角色。
消费者消费记录存储位置
在kafkalog 日志文件 可以查看到 __consumer_offsets 默认的情况下分为50个文件夹,因为在以前的老版本 消费者消费的offset信息都记录在zookeeper上不适合于频繁的读写操作,所以新版本的kafka中已经将kafka消费的offset信息保存在 Kafka 内部Topic中。
存储的格式:
Key:group.id+topic+分区号,而 value 就是 offset 的值。
考虑到一个 kafka 生产环境中可能有很多consumer 和 consumer group,如果这些 consumer 同时提交offset ,则必将加重 __consumer_offsets 的写入负载,因此 kafka 默认为该 topic 创建了50个分区,并且对每个 group.id做哈希求模运算Math.abs(groupID.hashCode()) % numPartitions,从而将负载分散到不同的 __consumer_offsets 分区上。
生产者默认的情况下在本地有一个缓冲区,默认的容量是为32MB,
一个batch存放n多条不同的消息,默认该batch容量16kb。
关于缓冲区的配置:
batch.size—抓取批量size 一次推送给kafka服务端消息数量,1.当我们的batch.size满了之后,自动会将该消息投递到mq中,那么如果消息过少呢,会不会不推送消息到服务端?可以加上该配置linger.ms 不管batch.size 是否容量有满,都会每隔该一定时间将该消息拉取给mq服务器端存放。
buffer-memory----缓存区容量 , 生产者投递消息先存放在本地缓冲区中,将消息组装成n多个不同的Batch,在通过send线程将缓冲区的数据批量的形式发送给kafka服务器端存放,生产者本地内存缓冲区如果设置太小了,在高并发情况下有可能会发生内存溢出,导致生产者无法继续写入消息到缓冲区卡死,过大又会浪费内存,应该根据压力测试情况,合理设置内存缓冲区大小
num.replica.fetchers 拉取线程数 配置多可以提高follower的I/O并发度,单位时间内leader持有更多请求,相应负载会增大,需要根据机器硬件资源做权衡
replica.fetch.min.bytes=1 拉取最小字节数 默认配置为1字节,否则读取消息不及时
replica.fetch.max.bytes= 5 * 1024 * 1024 拉取最大字节数 默认为1MB,这个值太小,5MB为宜,根据业务情况调整
replica.fetch.wait.max.ms follow 最大等待时间
日志保留策略配置生产者投递消息到kafka的mq中,消费者获取到消息之后不会立即被删除,
会有一个日志保留策略。
1).每当producer写入10000条消息时,刷数据到磁盘 配置为:log.flush.interval.messages=10000
2.)每间隔1秒钟时间,刷数据到磁盘。log.flush.interval.ms=1000
参考蚂蚁课堂