首页 > 编程知识 正文

rabbitmq原理详解,rabbitmq监听器原理

时间:2023-05-06 01:02:58 阅读:38956 作者:691

一、rabbitmq架构

RabbitMQ是一个受欢迎的开源消息队列系统,是高级消息服务队列协议(amqp )标准的实现,由以高性能、强健性和可伸缩性而闻名的Erlang语言开发,在此rabbitmq的简单体系结构如下:

上图简要说明了rabbitmq的体系结构。 从图中可以看到几个关键字。 vhost、exchange、路由密钥、队列等。 这些概念将在后面叙述。

让我们来看看rabbitmq的过程模型。

看这张图,你可能很熟悉,是的。 事件驱动模型(或反应堆模型)。 这是一种高性能的无阻塞io线程模型,但在Erlang中被称为进程模型。

tcp_acceptor进程接收客户端连接并创建rabbit_reader、rabbit_writer和rabbit_channel进程。

rabbit_reader接收客户端连接并分析AMQP帧; rabbit_writer将数据返回给客户端;

rabbit_channel解析AMQP方法,将消息路由到相应的队列进程。

rabbit_amqqueue_process是一个队列过程,是在启动RabbitMQ (恢复双类型队列)或创建队列时创建的。

rabbit_msg_store是负责消息持久化的过程。

整个系统中有相当于tcp_accepter进程、rabbit_msg_store进程和队列数量的rabbit_amqqueue_process进程,每个客户端连接一个rabbit

1.AMQP帧组件

AMQP帧由五个不同的组件组成。

帧类型以通道编号字节为单位的帧大小帧有效载荷payload结束字节标志(ASCII值206 )

2 .帧类型

AMQP规范定义了五种类型的框架:协议头框架、方法框架、内容框架、消息主体框架和心跳框架。 每个帧类型都有明确的目的,有些帧比其他帧使用频率高很多:

协议头框架用于连接到rabbitmq,并使用一次。 方法框架承载从rabbitmq或rabbitmq接收的rpc请求,或者在内容标头中包含消息的大小和属性。 消息主体框架直接在客户端和rabbitmq之间传递包含消息内容的心跳帧,作为验证连接两端是否可用和正常工作的机制。 3 .将消息组织成帧

使用方法框架、内容头框架和消息主体框架组成完整的rabbitmq消息。 方法的第一个框架具有命令和执行命令所需的参数(如交换机和路由密钥)、内容框架中消息的基本属性以及消息的大小。 也就是说,消息主体帧具有我们真正需要发送的消息的内容。

4 .方法框架结构

5 .内容标题的帧结构

内容标头包含以下具体属性:

内容类型:消息主体的消息代码,例如应用程序/JSON

扩展:消息过期

复制到:响应消息的队列名称

内容编码:消息压缩的代码,例如gzip

消息id :消息的编号

correlation-id :链接id

deliver-mode:已告知rabbitmq是将消息写入磁盘还是写入内存

用户- id :发送消息的用户。 发送消息时请不要设置此值)

时间表:消息送达的时间

headers:定义了可用于实现rabbitmq路由的属性。 例如,如果exchange类型为headers,则使用

6 .消息主体帧结构

7 .一些概念:

Broker :简单来说,就是消息队列中的服务器实体

Exchange一个消息交换机,它指定消息将根据哪个规则路由到哪个队列

队列)消息队列的载体。 每条消息都放入一个或多个队列中,其中队列类型分为临时队列、持久队列和排他队列

绑定:绑定。 起到根据路由规则绑定exchange和queue的作用

路由密钥:路由关键字,exchange根据该关键字进行消息的发送

vhost :虚拟主机,一个broker可以有多个vhost,用于不同用户的权限分离

producer :消息生产者是传递消息的程序

consumer :消息消费者是接收消息的程序

channel (每个消息通道、客户端连接都可以创建多个channel。 每个channel都表示会话任务

二、关于AMQP协议

1 .启动会话

2 .交换机声明

3 .声明队列

将队列绑定到exchange

5 .发送消息-使用事务机制

事务支持是AMQP协议的重要特性。 假设生产者向服务器发送持久化消息。 使用no_ack模式时,即使服务器崩溃且未持久化,生产者也不会知道消息已丢失。 如果这是

时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。但是使用事务,会导致性能下降,它使得生产者发布消息后必须等到消息真正持久化后服务端响应了才结束本次连接,所以需要在实际应用中平衡性能与安全的问题。

6.发送消息-非事务方式

使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。

Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务(也就是异步监听服务端的ack即可)。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。

但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。

7.消费消息

五、使用delivery-mode平衡速度和安全
delivery-mode有两个值:1表示非持久化,2表示持久化消息

1.发送消息到纯内存队列中,delivery-mode = 1

特点:非持久化的消息在服务宕机的时候会丢失数据,但是由于不需要磁盘io,尽可能地降低消息投递的延迟性,性能较高。

2.发布消息到支持磁盘存储的队列,delivery-mode = 2

特点:持久化的消息安全性较高,尽管服务宕机,数据也不会丢失,但是在投递消息的过程中需要发生磁盘io,性能相对纯内存投递的方式低,但是尽管是产生了磁盘io,由于日志的记录方式是直接追加到消息日志文件的末尾,属于顺序io,没有随机io,所以性能还是可以接受的。

大概原理:
所有队列中的消息都以append的方式写到一个文件中,当这个文件的大小超过指定的限制大小后,关闭这个文件再创建一个新的文件供消息的写入。文件名(*.rdq)从0开始然后依次累加。当某个消息被删除时,并不立即从文件中删除相关信息,而是做一些记录,当垃圾数据达到一定比例时,启动垃圾回收处理,将逻辑相邻的文件中的数据合并到一个文件中。

消息的读写及删除:
rabbitmq在启动时会创建msg_store_persistent,msg_store_transient两个进程,一个用于持久消息的存储,一个用于内存不够时,将存储在内存中的非持久化数据转存到磁盘中。所有队列的消息的写入和删除最终都由这两个进程负责处理,而消息的读取则可能是队列本身直接打开文件进行读取,也可能是发送请求由msg_store_persisteng/msg_store_transient进程进行处理。

在进行消息的存储时,rabbitmq会在ets表中记录消息在文件中的映射,以及文件的相关信息。消息读取时,根据消息ID找到该消息所存储的文件,在文件中的偏移量,然后打开文件进行读取。消息的删除只是从ets表删除指定消息的相关信息,同时更新消息对应存储的文件的相关信息(更新文件有效数据大小)。
六、消息路由模式
1.fanout模式
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

上图中,生产者发送到Exchange的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。
2.direct模式
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。如图,生产者发送消息的routing key=key1的时候,只有绑定了key1的queue才能收到信息

3.topic模式
topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“image.new.profile”.
binding key与routing key一样也是句点号“. ”分隔的字符串
binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配下一个据点前的所有字符,“#”用于匹配所有字符,包括句点(可以是零个)

如图,生产者以routing key为image.new.profile发布消息,这key可以被image.*.profile以及image.#匹配到,所有这两个队列都可以收到消息。由此可见,topic的路由方式更加灵活。
3.headers模式
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。如果设置为any,意思就是只要匹配到了headers表中的任何一对键值即可,all则代表需要全部匹配。

七、rabbitmq流量控制

RabbitMQ可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项恢复正常。除了这两个阈值,RabbitMQ在正常情况下还用流控(Flow Control)机制来确保稳定性。
Erlang进程之间并不共享内存(binaries类型除外),而是通过消息传递来通信,每个进程都有自己的进程邮箱。Erlang默认没有对进程邮箱大小设限制,所以当有大量消息持续发往某个进程时,会导致该进程邮箱过大,最终内存溢出并崩溃。
在RabbitMQ中,如果生产者持续高速发送,而消费者消费速度较低时,如果没有流控,很快就会使内部进程邮箱大小达到内存阈值,阻塞生产者(得益于block机制,并不会崩溃)。然后RabbitMQ会进行page操作,将内存中的数据持久化到磁盘中。
为了解决该问题,RabbitMQ使用了一种基于信用证的流控机制。消息处理进程有一个信用组{InitialCredit,MoreCreditAfter},默认值为{200, 50}。消息发送者进程A向接收者进程B发消息,每发一条消息,Credit数量减1,直到为0,A被block住;对于接收者B,每接收MoreCreditAfter条消息,会向A发送一条消息,给予A MoreCreditAfter个Credit,当A的Credit>0时,A可以继续向B发送消息。

八、 RabbitMQ 多层消息队列

RabbitMQ完全实现了AMQP协议,类似于一个邮箱服务。Exchange负责根据ExchangeType和RoutingKey将消息投递到对应的消息队列中,消息队列负责在消费者获取消息前暂存消息。在RabbitMQ中,MessageQueue主要由两部分组成,一个为AMQQueue,主要负责实现AMQP协议的逻辑功能。另外一个是用来存储消息的BackingQueue。
为了高效处理入队和出队的消息、避免不必要的磁盘IO,BackingQueue进程为消息设计了4种状态和5个内部队列。
(1) 4种状态包括:

alpha,消息的内容和索引都在内存中;beta,消息的内容在磁盘,索引在内存;gamma,消息的内容在磁盘,索引在磁盘和内存中都有;delta,消息的内容和索引都在磁盘。

对于持久化消息,RabbitMQ先将消息的内容和索引保存在磁盘中,然后才处于上面的某种状态(即只可能处于alpha、gamma、delta三种状态之一)。
(2) 5个内部队列

包括:q1、q2、delta、q3、q4。q1和q4队列中只有alpha状态的消息;q2和q3包含beta和gamma状态的消息;delta队列是消息按序存盘后的一种逻辑队列,只有delta状态的消息。所以delta队列并不在内存中,其他4个队列则是由erlang queue模块实现。

消息从q1入队,q4出队,在内部队列中传递的过程一般是经q1顺序到q4。实际执行并非必然如此:开始时所有队列都为空,消息直接进入q4(没有消息堆积时);内存紧张时将q4队尾部分消息转入q3,进而再由q3转入delta,此时新来的消息将存入q1(有消息堆积时)。

当内存紧张时触发paging,paging将大量alpha状态的消息转换为beta和gamma;如果内存依然紧张,继续将beta和gamma状态转换为delta状态。Paging是一个持续过程,涉及到大量消息的多种状态转换,所以Paging的开销较大,严重影响系统性能。

九、高可用队列(HA)
在生产环境下,一般都不会允许rabbitmq这种消息中间件单点,以免单点故障导致服务不可用,那么rabbitmq同样可以集群部署来保证服务的可用性,在rabbitmq集群中,我们可以定义HA队列,可以在web管理平台设置,也可以通过AMQP接口设置,当我们定义某个HA队列的时候,会在集群的各个节点上都建立该队列,发布消息的时候,直接发送至master服务,当master服务受到消息后,把消息同步至各个从节点,假如开启事务的情况下,是需要在消息被同步到各个节点之后才算完成事务,所以会带来一定的性能损耗,所以还是回到之前说的,性能和安全直接,需要根据实际业务的需要找到平衡点。

当master服务宕机之后,其中一个slaver节点会升级为master,消息不会丢失(因为已经完成了事务的消息都会在各个节点有备份)
ha-队列可以跨越集群的每台服务,或者仅使用其中一批独立节点。如果是全部节点都为副本的时候,将x-ha-policy参数设置为all,否则设置为nodes,然后在设置另一个参数:x-ha-nodes,该参数指定ha队列所在的节点列表。思考下,rabbitmq的集群节点是不是越多越好?

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。