首页 > 编程知识 正文

rabbitmq中文手册,rabbitmq使用教程

时间:2023-05-03 09:26:48 阅读:32382 作者:31

原文转载:http://blog.csdn.net/whycold/article/details/41119807

推荐文章:SpringBoot+RabbitMq的使用

所有一、简介MQ都称为消息队列,消息队列(MQ )是一种APP应用程序到APP应用程序的通信方法。 APP应用程序读取和写入队列中的消息(用于APP应用程序的数据)并进行通信,而无需使用专用连接进行链接。 消息传递是指通过在消息中发送数据在程序之间进行通信,而不是相互直接调用进行通信。 通常,直接调用用于远程过程调用等技术。 队列是指APP应用程序通过队列进行通信。 队列的使用消除了发送和接收APP应用程序以及并发的请求。 其中成熟的MQ产品有IBM WEBSPHERE MQ等.

二、使用场景项目中,我们提取了一个无需立即返回且耗时的操作进行异步处理。 这种异步处理大大节省了服务器的请求响应时间,提高了系统吞吐量。

三、相关名称介绍1、连接工厂、连接、通道连接工厂、连接和通道由RabbitMQ对外提供的Channel

Connection是RabbitMQ的套接字链接,封装了部分与套接字协议相关的逻辑。

连接工厂是连接的制造工厂。

Channel是我们与RabbitMQ交往的最重要的接口之一,我们的大部分业务操作都在Channel这个接口中进行。 例如,Queue定义、Exchange定义、Queue和Exchange绑定以及消息发布。

2、队列是RabbitMQ的内部对象,用于存储消息。 如下图所示。

所有RabbitMQ消息只能保存在队列中。 生产者(下图中的p )可以生成信息并最终发送到队列,消费者(下图中的c )可以从队列获取信息并消费。

多个消费者可以订阅同一Queue。 在这种情况下,队列中的消息将平均分配给多个消费者进行处理,而不是所有消费者接收并处理所有消息。

3、消息知识在实际APP应用中,消费者收到来自队列的消息,但可能未完成就停机(或发生其他事故),在这种情况下消息可能会丢失。 为了避免这种情况,我们可以要求消费者在消费完信息后将回复发送给RabbitMQ。 RabbitMQ收到“消息确认”,然后从队列中删除该消息。 如果rabit MQ没有收到回复,并且检测到消费者的rabit MQ连接断开,则rabit MQ将该信息发送给其他消费者(如果有多个消费者)进行处理。 这里不存在timeout概念,即使一个消费者处理消息的时间变长,只要RabbitMQ连接没有断开,该消息也不会发送给其他消费者。

这里会出现另一个问题。 如果我们的开发人员在处理业务逻辑后忘记将回复发送到RabbitMQ,则存储在严重错误——queue中的消息会增加。 消费者在重启后,会反复消耗这些消息,并反复运行业务逻辑…

另外,pub message中没有ack。

4、如果希望即使MessagedurabilityRabbitMQ服务重新启动,也不丢失消息,请将Queue和message都设置为“可持续”,大多数情况下,为rabbitMQ消息但是,仍然不能解决低概率丢失事件的发生。 例如,RabbitMQ服务器收到生产者的消息,但在消息没有持续时间之前,RabbitMQ服务器已关闭。 如果还需要管理这种低概率事件,请使用事务。 这里只是RabbitMQ的简单介绍,所以这里不讨论与RabbitMQ相关的事情。

5、Prefetch count前面说过,如果多个消费者同时订阅同一Queue的消息,Queue的消息就会平均到多个消费者身上。 此时,如果每条短信的处理时间不同,一些消费者会一直很忙,其他消费者可能很快就完成手头的工作,陷入空闲状态。 通过设置prefetchCount,可以限制队列向每个消费者发送的消息数。 例如,如果设置prefetchCount=1,则Queue会向每个消费者发送一条信息。消费者处理完此信息后,Queue会再次向该消费者发送信息。

6、Exchange在上一节看到生产者向Queue发送消息,但实际上在RabbitMQ上永远不会发生这种事。 实际上,生产者将消息发送到Exchange (交换机,下图中的x ),Exchange将消息路由(或销毁)到一个或多个队列。

Exchange使用什么逻辑将消息路由到队列? 这是下面的8、Binding

中介绍。

RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,这将在下面的10、Exchange Types中介绍。

7、routing key

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。

在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。

8、Binding

RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。

9、Binding key

在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。

在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。
binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。

10、Exchange Types

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),下面分别进行介绍。

fanout

fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。

direct

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。

topic

前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”

binding key与routing key一样也是句点号“. ”分隔的字符串。

binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1与Q2,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。

headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。

11、RPC

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。

但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。

RabbitMQ中实现RPC的机制是:

客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败);

服务器端收到消息并处理;

服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性;

客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。

四、总结

本文介绍了RabbitMQ中个人认为最重要的概念,充分利用RabbitMQ提供的这些功能就可以处理我们绝大部分的异步业务了。

推荐demo:SpringBoot+RabbitMq的使用

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。