首页 > 编程知识 正文

kafka是什么,mapreduce工作原理

时间:2023-05-05 21:02:14 阅读:33265 作者:2529

1、背景导入:消息队列是什么

现代技术的实时更新,已经越来越高的实时性要求,因此对技术的要求也越来越高,但是在海量数据的传输过程中如何才能保证数据的快速传输? 从这里产生了消息队列。

消息是在两台计算机之间传输的数据单位。 消息可以非常简单,例如只包含文本字符串。 它可以更复杂,也可以包含嵌入的对象。

消息将发送到队列。 消息队列是在消息传输过程中保存消息的容器。 消息队列管理器在将消息从源中继到目标时充当中间人。 队列的主要目的是提供路由并保证消息的传递。 如果收件人在发送消息时不可用,则消息队列会保留该消息直到成功传递消息。

2、消息队列有什么好处?

1 )解耦)消除数据传输之间的依赖性

2 )冗馀)避免数据安全丢失

3 )可扩展性:提高数据传输能力

4 )灵活的峰值处理能力:保证数据接收的效率

5 )可恢复性:确保数据安全的备份机制

6 )顺序保证:保证数据的有序性

7 )缓冲:防止因外部因素导致数据无法接收而丢失

8 )异步通信:数据的生产和接收端可以异步处理

3、数据传输模式是什么?

1 )点对点模式(一对一,消费者主动提取数据,消息传来后清除消息)。

点对点模型通常是基于抽取或轮询的消息传递模型,从队列中请求

请求信息,而不是将消息推送到客户端。 该模型的特点是,即使有多个邮件收件人,发送到队列的邮件也只有一个收件人进行接收处理。

2 )发布/订阅模型(一对多,数据生产后推送给所有订阅者)。

分发模型是一种基于推送的消息传递模型。 发布模型有一些不同

的订阅者。 临时订阅者只有在主动接收主题时才会接收消息。 即使当前订阅者不可用,永久订阅者也会接收主题中的所有消息。

4、kafka是什么?

在流计算中,Kafka通常用于缓存数据,Storm通过消耗Kafka的数据进行计算。

1 ) Apache Kafka是一个开源消息系统,由Scala编写。 是由Apache软件基金会开发的开源消息系统项目。

2 ) Kafka最初由LinkedIn公司开发,2011年初成为开源。 2012年10月毕业于Apache Incubator。 这个项目的目标是提供统一、高吞吐量和低延迟的平台来处理实时数据。

3 ) Kafka是分布式消息队列。 Kafka在保存消息时根据主题进行分类,发送消息的人称为Producer,接收消息的人称为Consumer。 Kafka群集还具有多个Kafka实例,每个实例(server )都称为broker。

4 )无论是kafka群集还是consumer都依赖zookeeper群集存储元信息,以保证系统可用性。

5、kafka是用什么做的?

1 ) Producer )消息生产者是向Kafka中介发送消息的客户端

2 ) Consumer )消息消费者,向Kafka中介发送消息的客户端

3 ) Topic )可以理解为一个队列

4 ) consumergroup(CG )这是kafka实现一个主题消息的广播(发送到所有consumer )和单播(发送到任意一个consumer )的手段。 一个topic可以有多个CG。 主题消息将复制到所有CG中,但每个分区只向该CG中的一个consumer发送消息。 需要实现广播的情况下,每个consumer只要有独立的CG就可以了。 要实现单播,只要所有的consumer都在同一CG上。 在CG中,也可以在不向不同的topic发送多次消息的情况下自由地将consumer分组;

5 ) Broker ) kafka服务器是Broker。 一个集群由多个broker组成。 一个中介可以容纳多个主题;

6 )为了实现分区(Partition )可扩展性,非常大的topic可以分布在多个broker (即服务器)中,一个topic可以分成多个分区,每个分区都是有序的队列分区中的每条消息都被分配一个offset (有序)。 kafka只保证按照一个partition内的顺序向consumer发送消息,不保证一个topic的整体(多个partition之间)的顺序;

7 ) Offset:kafka的所有保存文件都是以offset.kafka命名的。 以offset为名称的好处是容易寻找。 例如,如果想找到2049的位置,只需找到2048.kafka的文件即可。 当然the first offset是00000000000.kafka。

6、kafka的构建(例如?

1 )准备三台虚拟机,分别命名为kafka01、kafka02、kafka03

2 )解压缩安装包

[ Kafka @ Kafka 01 software ] $ tar-zxvf Kafka _ 2.11-0.11.0.tgz-c/opt/module /

3)修改解压后的文件名称

[kafka@kafka01 module]$ mv kafka_2.11-0.11.0.0/ kafka

4)在/opt/module/kafka目录下创建logs文件夹

[kafka@kafka01 kafka]$ mkdir logs

5)修改配置文件

[kafka@kafka01 kafka]$ cd config/

[kafka@kafka01 config]$ vi server.properties

输入以下内容:

#broker的全局唯一编号,不能重复

broker.id=0

#删除topic功能使能

delete.topic.enable=true

#处理网络请求的线程数量

num.network.threads=3

#用来处理磁盘IO的线成数量

num.io.threads=8

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接收套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小

socket.request.max.bytes=104857600

#kafka运行日志存放的路径

log.dirs=/opt/module/kafka/logs

#topic在当前broker上的分区个数

num.partitions=1

#用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

#segment文件保留的最长时间,超时将被删除

log.retention.hours=168

#配置连接Zookeeper集群地址

zookeeper.connect=kafka012:2181,kafka02:2181,kafka03:2181

6)配置环境变量

[kafka@kafka01 module]$ sudo vi /etc/profile

#KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka

export PATH=$PATH:$KAFKA_HOME/jqdjzg

[kafka@kafka01 module]$ source /etc/profile

7)分发安装包

[kafka@kafka01 module]$ xsync kafka/

      注意:分发之后记得配置其他机器的环境变量

8)分别在kafka02和kafka03上修改配置文件

/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2

      注:broker.id不得重复

9)启动集群

依次在kafka01、kafka02、kafka03节点上启动kafka

[kafka@kafka01 kafka]$ jqdjzg/kafka-server-start.sh config/server.properties &

[kafka@kafka02 kafka]$ jqdjzg/kafka-server-start.sh config/server.properties &

[kafka@kafka03 kafka]$ jqdjzg/kafka-server-start.sh config/server.properties &

7、kafka命令行操作?

1)查看当前服务器中的所有topic

[kafka@kafka01 kafka]$ jqdjzg/kafka-topics.sh --zookeeper kafka01:2181 --list

2)创建topic

[kafka@kafka01 kafka]$ jqdjzg/kafka-topics.sh --zookeeper kafka01:2181

--create --replication-factor 3 --partitions 1 --topic first

选项说明:

--topic 定义topic名

--replication-factor  定义副本数

--partitions  定义分区数

3)删除topic

[kafka@kafka01 kafka]$ jqdjzg/kafka-topics.sh --zookeeper kafka01:2181

--delete --topic first

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

4)发送消息

[kafka@kafka01 kafka]$ jqdjzg/kafka-console-producer.sh

--broker-list kafka01:9092 --topic first

>hello world

>kafka  kafka

5)消费消息

[kafka@hadoop103 kafka]$ jqdjzg/kafka-console-consumer.sh

--zookeeper kafka01:2181 --from-beginning --topic first

--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

6)查看某个Topic的详情

[kafka@kafka01 kafka]$ jqdjzg/kafka-topics.sh --zookeeper kafka01:2181

--describe --topic first

8、kafka工作流程分析?

写入流程:
      1)producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader

2)producer将消息发送给该leader

3)leader将消息写入本地log

4)followers从leader pull消息,写入本地log后向leader发送ACK

5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK

存储策略:

无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:

1)基于时间:log.retention.hours=168

2)基于大小:log.retention.bytes=1073741824

需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

消费过程:

能够让开发者自己控制offset,想从哪里读取就从哪里读取。

自行控制连接分区,对分区自定义进行负载均衡

对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中)

消费者组:

消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。

在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。

消费方式:

consumer采用pull(拉)模式从broker中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)。

交流qq群:1022901775,获取课件、代码,技术交流,问题反馈;

为方便学习,请关注"百数云课"官方公众号。

 

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。