首页 > 编程知识 正文

kafka底层原理,kafka创建topic命令

时间:2023-05-05 07:29:19 阅读:33205 作者:1339

系列文章目录第一章Kafka原理分析(一)-- Kafka主要流程概述

第二章Kafka原理分析(二-- Kafka制作主题Topic流程分析

前言Kafka是主流的大数据级分布式流媒体处理系统,涉及集群内部实现的知识点很多,每个知识点都单独作为技术话题进行讨论和参考。 本文只把这些知识点放在一起,用通俗易懂的语言表述,重点是概念、流程、机制,也便于统一参照和防止遗忘。

本章主要从源代码层面解剖Kafka的主题制定流程,但不要过于细致,主要从环节和机制入手。

另一方面,Kafka中主题创建的元数据要求Kafka中的主题主题是Kafka中的基础概念,是所有消息处理的基础。 主题是Kafka元数据的一部分,存储在Zookeeper中,因此位于创建主题的过程也就是往Zookeeper写入数据的过程

创建主题主题所需提交的信息主要有以下五项:

主题名称(必填) :

要求:不能是现有名称。 长度请控制在249以下。 最好不要使用“.”。 请只输入字母数字或下划线。 本主题中的分区数numPartitions (可选) :

如果不输入,将基于配置中的默认分区执行每个分区的副本数复制因子。 可选:

如果未输入,系统将基于正在配置的默认分区运行,副本数不能大于broker的数量分区和Brokers分配方案(可选) :

可以输入分区下标对应的brokerId列表(存在对应的副本)。 (可选)配置环境configs,如果未输入,将根据默认分配方案进行分配(如下所述)。

缺省情况下,可以找到类路径下的配置。 也可以直接指定两种配置: brokers和zookeeper。 另外,如何在Kafka中创建主题Kafka还有一些配置,您可以选择在auto.create.topics.enable=true中自动创建主题

通常,使用过程中不会自动创建,而是手动创建,易于管理,因此重点是手动创建

1 .三种创建方法主题一般需要事先手动创建,创建方法有三种。

1 )通过服务器端脚本连接到Zookeeper进行创建(不推荐) )。

sdyl/Kafka-topics.sh---- create-zookeeper localhost 33602181---- topic input _ topic2)通过服务器端脚本连接到中介程序创建

sdyl/Kafka-topics.sh---create---- bootstrap-server localhost 33609092---topic input _ topic3)在JAVA代码中

Properties props=new Properties (; props.set property (' bootstrap.servers ',' localhost:9092 ' )管理员管理员客户端=管理员. create ) ) props; finalstringinput _ topic=' input-topic '; new topic new topic=new new topic (input _ topic,Optional.empty ),Optional.empty ); finallistnewtopicnewtopics=arrays.as list (new topic ); 管理员客户端. create topics (new topics ).all ).get ); 2 .脚本kafka-topics.sh分析脚本kafka-topics.sh实际上也启动JAVA类TopicCommand

exec$(dirname$0)/Kafka-run-class.shkafka.admin.topic command ' $ @ ' topic command是用scala编写的源代码的核心core模块

主题命令根据输入的两个参数确定要使用哪个客户端。

bootstrap-server---http://www.Sina.com/(连接到Kafka控制器服务器进行主题创建操作) zookeeper----http://www.Sina.com/3 .创建时分区和副本数的自定义脚本方式: sdyl/Kafka-topics.sh---create---bootstrap-server localhost :9092-------botststrap

tions = 3;NewTopic newTopic = new NewTopic(INPUT_TOPIC,numPartitions, replicationFactor); 4. 创建时分区分配方案的定制

指定分区分配方案就是要指定某一个分区的所有副本与Kafka Broker Id之间的存储映射关系。如果制定了分区方案,则分区和副本数已定,单个的分区和副本数就不能输入了

脚本方式: sdyl/kafka-topics.sh --create --bootstrap-server localhost:9092 --replica-assignment 0:1:2,3:4:5,6:7:8 --topic input_topic

规则:

某一个分区的副本所在broker,按顺序以冒号隔开,多个分区之间以逗号隔开,

如:0:1:2,3:4:5,6:7:8 代表第一个分区的副本放在0,1,2这三台broker中,第二个分区的副本放在3,4,5这三台broker中,第三个分区的副本放在6,7,8这三台broker中。

代码方式: Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();replicasAssignments.put(0, Arrays.asList(0,1,2));replicasAssignments.put(1, Arrays.asList(3,4,5));replicasAssignments.put(2, Arrays.asList(6,7,8));NewTopic newTopic = new NewTopic(INPUT_TOPIC,replicasAssignments); 5. 创建时分区分配方案的系统默认策略

若不输入分区分配方案,则系统使用默认分区策略来创建分区的分配方案 由AdminUtils.assignReplicasToBrokers方法负责计算。
该方法还判断目前Broker是否有机架感知配置,若有的话,会尽量让副本分布到不同机架上。

Kafka中默认的分区分配策略为:

递增移位值 increasingShift
由于分区的数量可能超过broker的数量,为了使副本分布更均匀,引进一个递增移位值increasingShift,初始化为0。在分配某个分区的副本的broker的时候,定好第一个副本broker后,正常第二个副本会按顺序取后面的broker,即与第一个副本的间隔为1,依次类推。若此时分区的数量超过超过broker的数量,这时就又从第一个broker开始轮起,则此时increasingShift递增到1,分区的第二个副本将和第一个副本的间隔变为2,后面副本间隔不变。在一个新的分区又要到第三次轮到第一个broker时,increasingShift递增到2,分区的第二个副本将和第一个副本的间隔变为3,后面副本间隔不变。第i分区p(i)的副本分配算法 (伪代码) //若又开始一轮broker分配,increasingShift递增1 if i == brokes.length then increasingShift += 1 // 第i个分区的第一个副本放在第i个broker firstReplicaIndex = i % brokes.length //分配其他副本的brokers foreach j in replicas: //第二个副本开始的broker为第一个副本的位置+1再加increasingShift 依次类推。 otherReplicaIndex = (firstReplicaIndex + 1 + increasingShift + j) % brokes.length ; //得到第i个分区的分配方案 currentPartitionAssignment = (i -> [firstReplicaIndex ,otherReplicaIndex ... ]) 第i分区p(i)的副本分配算法的随机性增强
Kafka中为了调节不同主题的分区均匀分布,开始第一个分区计算时随机从某个broker开始,不一定要每次都从第一个开始。
increasingShift递增因子也是随机生成初始值,再递增。完整的分配流程
将所有的分配计算一遍即可完成分配 foreach i in numPartitions: assignOnePartition(i);

最终形成的分配方案结构跟定制输入的一样。

三、Kafka中主题创建的主要流程

Kafka主题的创建均是依赖于KafkaAdminClient跟Controller服务器进行RPC通信完成主题创建流程。在此之前首先需要跟任意broker服务器通信得到Kafka的metaddata数据,来获得ControllerNode的信息,然后再跟ControllerNode进行通信。

主要流程:

KafkaAdminClient启动后在后台启动一个线程,定时更新Metadata。更新Metadata时连接的Node是任意一台比较闲的broker结点。更新Metadata时由AdminMetadataManager负责向broker发起请求,该broker在KafkaApis里面直接处理该请求返回当前缓存的Metadata数据。AdminMetadataManager获得返回后更新Metadata,KafkaAdminClient顺利获得Metadata中的ControllerNode的信息,若得不到它会一直循环等待中。获得ControllerNode信息后,KafkaAdminClient可以连接上并发送createTopics请求,Controller获得请求后交给ZkAdminManager来处理,因为毕竟要跟Zookeeper打交道。ZkAdminManager首先对没有输入的参数补充默认值(如分区数和副本数),以及分配分区副本方案等。调用AdminZkClient的validateTopicCreate进行验证主题输入参数的合法性。调用AdminZkClient的createTopicWithAssignment将主题元数据写入Zookeeper。

有关元数据的结构和Zookeeper的存储结构及协调处理在下一章介绍。


总结

本文结合源码总结了Kafka创建主题的相关环节和流程。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。