1、背景导入:消息队列是什么
现代技术的实时更新,已经越来越高的实时性要求,因此对技术的要求也越来越高,但是在海量数据的传输过程中如何才能保证数据的快速传输? 从这里产生了消息队列。
消息是在两台计算机之间传输的数据单位。 消息可以非常简单,例如只包含文本字符串。 它可以更复杂,也可以包含嵌入的对象。
消息将发送到队列。 消息队列是在消息传输过程中保存消息的容器。 消息队列管理器在将消息从源中继到目标时充当中间人。 队列的主要目的是提供路由并保证消息的传递。 如果收件人在发送消息时不可用,则消息队列会保留该消息直到成功传递消息。
2、消息队列有什么好处?
1 )解耦)消除数据传输之间的依赖性
2 )冗馀)避免数据安全丢失
3 )可扩展性:提高数据传输能力
4 )灵活的峰值处理能力:保证数据接收的效率
5 )可恢复性:确保数据安全的备份机制
6 )顺序保证:保证数据的有序性
7 )缓冲:防止因外部因素导致数据无法接收而丢失
8 )异步通信:数据的生产和接收端可以异步处理
3、数据传输模式是什么?
1 )点对点模式(一对一,消费者主动提取数据,消息传来后清除消息)。
点对点模型通常是基于抽取或轮询的消息传递模型,从队列中请求
请求信息,而不是将消息推送到客户端。 该模型的特点是,即使有多个邮件收件人,发送到队列的邮件也只有一个收件人进行接收处理。
2 )发布/订阅模型(一对多,数据生产后推送给所有订阅者)。
分发模型是一种基于推送的消息传递模型。 发布模型有一些不同
的订阅者。 临时订阅者只有在主动接收主题时才会接收消息。 即使当前订阅者不可用,永久订阅者也会接收主题中的所有消息。
4、kafka是什么?
在流计算中,Kafka通常用于缓存数据,Storm通过消耗Kafka的数据进行计算。
1 ) Apache Kafka是一个开源消息系统,由Scala编写。 是由Apache软件基金会开发的开源消息系统项目。
2 ) Kafka最初由LinkedIn公司开发,2011年初成为开源。 2012年10月毕业于Apache Incubator。 这个项目的目标是提供统一、高吞吐量和低延迟的平台来处理实时数据。
3 ) Kafka是分布式消息队列。 Kafka在保存消息时根据主题进行分类,发送消息的人称为Producer,接收消息的人称为Consumer。 Kafka群集还具有多个Kafka实例,每个实例(server )都称为broker。
4 )无论是kafka群集还是consumer都依赖zookeeper群集存储元信息,以保证系统可用性。
5、kafka是用什么做的?
1 ) Producer )消息生产者是向Kafka中介发送消息的客户端
2 ) Consumer )消息消费者,向Kafka中介发送消息的客户端
3 ) Topic )可以理解为一个队列
4 ) consumergroup(CG )这是kafka实现一个主题消息的广播(发送到所有consumer )和单播(发送到任意一个consumer )的手段。 一个topic可以有多个CG。 主题消息将复制到所有CG中,但每个分区只向该CG中的一个consumer发送消息。 需要实现广播的情况下,每个consumer只要有独立的CG就可以了。 要实现单播,只要所有的consumer都在同一CG上。 在CG中,也可以在不向不同的topic发送多次消息的情况下自由地将consumer分组;
5 ) Broker ) kafka服务器是Broker。 一个集群由多个broker组成。 一个中介可以容纳多个主题;
6 )为了实现分区(Partition )可扩展性,非常大的topic可以分布在多个broker (即服务器)中,一个topic可以分成多个分区,每个分区都是有序的队列分区中的每条消息都被分配一个offset (有序)。 kafka只保证按照一个partition内的顺序向consumer发送消息,不保证一个topic的整体(多个partition之间)的顺序;
7 ) Offset:kafka的所有保存文件都是以offset.kafka命名的。 以offset为名称的好处是容易寻找。 例如,如果想找到2049的位置,只需找到2048.kafka的文件即可。 当然the first offset是00000000000.kafka。
6、kafka的构建(例如?
1 )准备三台虚拟机,分别命名为kafka01、kafka02、kafka03
2 )解压缩安装包
[ Kafka @ Kafka 01 software ] $ tar-zxvf Kafka _ 2.11-0.11.0.tgz-c/opt/module /
3)修改解压后的文件名称
[kafka@kafka01 module]$ mv kafka_2.11-0.11.0.0/ kafka
4)在/opt/module/kafka目录下创建logs文件夹
[kafka@kafka01 kafka]$ mkdir logs
5)修改配置文件
[kafka@kafka01 kafka]$ cd config/
[kafka@kafka01 config]$ vi server.properties
输入以下内容:
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=kafka012:2181,kafka02:2181,kafka03:2181
6)配置环境变量
[kafka@kafka01 module]$ sudo vi /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/jqdjzg
[kafka@kafka01 module]$ source /etc/profile
7)分发安装包
[kafka@kafka01 module]$ xsync kafka/
注意:分发之后记得配置其他机器的环境变量
8)分别在kafka02和kafka03上修改配置文件
/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
注:broker.id不得重复
9)启动集群
依次在kafka01、kafka02、kafka03节点上启动kafka
[kafka@kafka01 kafka]$ jqdjzg/kafka-server-start.sh config/server.properties &
[kafka@kafka02 kafka]$ jqdjzg/kafka-server-start.sh config/server.properties &
[kafka@kafka03 kafka]$ jqdjzg/kafka-server-start.sh config/server.properties &
7、kafka命令行操作?
1)查看当前服务器中的所有topic
[kafka@kafka01 kafka]$ jqdjzg/kafka-topics.sh --zookeeper kafka01:2181 --list
2)创建topic
[kafka@kafka01 kafka]$ jqdjzg/kafka-topics.sh --zookeeper kafka01:2181
--create --replication-factor 3 --partitions 1 --topic first
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
3)删除topic
[kafka@kafka01 kafka]$ jqdjzg/kafka-topics.sh --zookeeper kafka01:2181
--delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
4)发送消息
[kafka@kafka01 kafka]$ jqdjzg/kafka-console-producer.sh
--broker-list kafka01:9092 --topic first
>hello world
>kafka kafka
5)消费消息
[kafka@hadoop103 kafka]$ jqdjzg/kafka-console-consumer.sh
--zookeeper kafka01:2181 --from-beginning --topic first
--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
6)查看某个Topic的详情
[kafka@kafka01 kafka]$ jqdjzg/kafka-topics.sh --zookeeper kafka01:2181
--describe --topic first
8、kafka工作流程分析?
写入流程:
1)producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader
2)producer将消息发送给该leader
3)leader将消息写入本地log
4)followers从leader pull消息,写入本地log后向leader发送ACK
5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK
存储策略:
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
消费过程:
能够让开发者自己控制offset,想从哪里读取就从哪里读取。
自行控制连接分区,对分区自定义进行负载均衡
对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中)
消费者组:
消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。
消费方式:
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)。
交流qq群:1022901775,获取课件、代码,技术交流,问题反馈;
为方便学习,请关注"百数云课"官方公众号。