首页 > 编程知识 正文

mqtt和rabbitmq的区别(为什么要用rabbitmq)

时间:2023-05-05 03:48:02 阅读:92223 作者:116

RabbitMQ的Java客户端统一使用com.rabbitmq.client作为顶级软件包名称。 其中,最核心的类别主要是连接工厂、连接、通道、消费者、默认消费者、基本属性。 另外,在这篇文章中,除了教授RabbitMQ客户端的基本玩法之外,还有些你可能不知道的骚扰。

使用

连接RabbitMQ

RabbitMQ的第一步当然是连接到RabbitMQ。 这里不涉及如何构建RabbitMQ环境。 本文假设已经有RabbitMQ环境。 连接到rabbitmq的代码如下。

连接工厂=新连接工厂(;

factory.set用户名(' root );

factory.set密码(' root 123 );

factory.set虚拟主机('/';

factory.sethost(127.0.0.1 );

factory.setport(5672;

连接=factory.new连接(; 另外,使用默认的vhost,即/时。 可以在中省略名为factory.setvirtualhost('/' )的行中的代码。 那么,这里有个有趣的问题。 创建RabbitMQ连接的最短代码是什么? 答案只需要两行代码。 为什么会这样呢? 由于创建连接的这些字段具有默认值,因此用户名和密码的默认值为guest/guest,主机和端口的默认值为localhost和5672。 connectionfactory.Java源代码有一个以DEFAULT_开头的常量,这是默认值)。 但是,请注意,默认帐户guest只能连接到本地RabbitMQ环境:

连接工厂=新连接工厂(;

连接=factory.new连接(; 创建RabbitMQ连接的另一种方法是使用URI,如以下代码所示:

连接工厂=新连接工厂(;

factory.seturi (amqp ://用户名:密码@主机名称:端口/虚拟主机);

连接=factory.new连接(; 每次成功创建连接时,RabbitMQ服务器端都会有相应的日志。

2020-05-1017336051336058.380 [信息]0. 7312.0 acceptingamqpconnection0. 7312.0 ([ :1 ] 336061390-[ 336000 ]

2020-05-1017336051336058.509信息0.7312.0连接0.7312.0 (:1 ) 336061390-336033601。 360用户' guest ' authenticatedandgrantedaccesstovhost '/'

被动申明

被动声明队列,如以下代码所示: 只检查队列是否存在,如果存在,则不执行任何操作,并返回与队列是否已成功创建相同的响应消息。 如果队列不存在,则抛出通道级异常。 因此,被动声明一般用于一次性临时信道声明的地方:

queue.declareokresponse=通道. queuedeclarepassive (' queue-name ' );

//returnsthenumberofmessagesinreadystateinthequeue

response.getMessageCount (;

//returnsthenumberofconsumersthequeuehas

response.getConsumerCount (; 如果队列不存在,则抛出的异常消息如下:

caused by : com.rabbit MQ .客户端. shutdownsignalexception :通道错误; 协议方法: #方法通道. close (复制代码=404,复制文本=not _ found-no queue ' none ' invh

线程共用一个Channel实例,否则会出现一些莫名其妙的错误。

消费者(Push模式)

消费者消费消息一般通过channel.basicConsume方法,这个方法有很多重载参数,不过我们常用的方法是下面这两个。官方更加推荐第一个带有consumerTag的方法,并且每个不同的消费者实例要有不同的consumerTag。强烈不建议一个连接上有相同的consumerTag,否则可能会导致automatic connection recovery的问题,参考(https://www.rabbitmq.com/api-guide.html#connection-recovery):

ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root123");factory.setVirtualHost("/");factory.setHost("127.0.0.1");factory.setPort(5672);Connection conn = factory.newConnection();

consumerTag是消费者唯一标识符。如果是使用了不带consumerTag参数的方法,那么RabbitMQ会自动生成一个唯一Tag,这样的Tag没有业务参考意义。如下图所示:

如果我们自定义了consumerTag的值,那么,一看某个队列的消费者信息,就知道这些消费者来自哪里、是干嘛的,非常让人容易理解。如下图所示:

需要说明的是,这种Push模式,如果生产者产生的消息量超过消费者能承受的量,就会撑爆消费者。不过,RabbitMQ考虑到了这一点,可以通过方法channel.basicQos(1000)进行限流。basic.qos是针对Channel进行设置的,也就是说只有在channel建立之后才能发送basic.qos命令。在rabbitmq的实现中,每个channel都对应会有一个rabbit_limiter进程,当收到basic.qos命令后,在rabbit_limiter进程中记录信令中prefetch_count的值,同时记录的还有该channel未ack的消息个数,从而保证未ack的消息数量不超过prefetch_count的值(如果prefetch_count设置为0,表示没有任何限制)。

消费者(Pull模式)

Push模式对应的消费端方法是basicConsumer(),而Pull模式对应的消费端方法是basicGet()。每次获取一条消息,不能批量。这种方法效率非常低下,因为不知道队列中是否有消息,所以必须反复询问,即使大部分请求没有结果的情况下,这种方法非常不推荐使用。代码如下:

ConnectionFactory factory = new ConnectionFactory();Connection conn = factory.newConnection();

高级连接方式

消费者线程默认被一个ExecutorService自动分配,当连接被关闭的时候,默认的ExecutorService会调用shutdown()。但是,如果创建连接的时候使用了用户自定义ExecutorService,必须手动调用shutdown()方法,否则,线程池中的线程可能会阻止JVM终止,除非kill -9。使用自定义线程池代码如下所示:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'queue-none' in vhost '/', class-id=50, method-id=10)

需要说明的是,这个特性只有懵懂的心锁明确知道在消费者callback碰到处理瓶颈的情况下才考虑使用,如果没有消费者callback,或者非常少量,那么默认的线程池完全足矣。

使用地址集合

在通过factory构造Connection的时候,允许配置多个地址集合,代码如下所示。它会首先尝试连接host1:post1,如果连接失败,会再尝试连接host2:post2,而且整个过程对用户无感知,只要有一个地址是可用的,就不会抛出任何异常:

Address[] addr = new Address[]{ new Address(host1, port1), new Address(host2, port2)}; Connection conn = factory.newConnection(addr);

支持NIO

RabbitMQ的Java客户端从4.0开始支持Java NIO。NIO的目的不是为了比BIO更快,它只是为了方便用户更轻易的控制资源,比如线程等。默认的BIO模式下,每一个Connection连接都会用一个线程从网络Socket中读取数据。而在NIO模式下,我们是可以控制与网络Socket交互的线程数。

如果你的Java进程中使用了几十甚至上百个Connection,那么可以尝试使用NIO模式,因为它相比默认的BIO模式,可以节省很多的线程资源。并且在线程数设置合理的情况下,性能不会有任何衰减。开启NIO模式非常简单,如下所示:

ConnectionFactory factory = new ConnectionFactory(); // 默认nio模式线程数为1 factory.useNio(); 另一种使用NIO的方式: factory.setNioParams(new NioParams().setNbIoThreads(4));

网络故障自动恢复

在RabbitMQ服务器和Java客户端之间的网络故障是很常见的现象,RabbitMQ的Java客户端是支持自动恢复的,并且4.0以后该特性是默认开启的,证据在ConnectionFactory的源码中:

private boolean automaticRecovery = true; private boolean topologyRecovery = true;

topology是什么意思?中文是拓扑的意思。在这里是指交换机、队列、绑定关系、消费者等。

我们也可以在new出来ConnectionFactory的时候,显示设置开启or关闭。如果恢复失败,RabbitMQ会固定时间间隔以后进行重试,默认为5秒钟(DEFAULT_NETWORK_RECOVERY_INTERVAL)。可以通过方法setNetworkRecoveryInterval()指定间隔时间。如果构造Connection时用的是地址集合,那么地址会被随机打乱,然后一个接一个进行重试:

ConnectionFactory factory = new ConnectionFactory(); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(10000);

那么故障恢复在什么时候触发呢?主要是如下这些情况,只要任意一个条件发生都会触发:

Connection上抛出IO异常、或者其他一些其他非预期的异常;scoket读取超时;失去心跳;

如果是应用启动过程中初始化连接碰到RabbitMQ节点故障,这种情况下自动连接恢复是不会介入的。因为这种情况下,很可能RabbitMQ有一些故障或者问题,开发人员有责任排查问题原因。另外,如果显示调用connection.close()方法后,恢复机制也不会介入。

心跳机制

创建ConnectionFactory时,设置一个大于0的值就是开启心跳机制。如果设置等于0的值,就是关闭心跳机制:

ConnectionFactory cf = new ConnectionFactory(); // set the heartbeat timeout to 60 seconds cf.setRequestedHeartbeat(60);

需要说明的是,如果设置心跳超时值太低的话,可能会由于一些原因比如瞬时网络故障等导致误报。这里给出一些经验数据:值低于5秒的话,很可能造成误报。值低于1秒的话,基本上都是误报。值在5~20秒之间对大部分环境来说,都是一个比较理想的值。如果是一个很大的值,例如1800秒,这时候心跳信息传送的少了,几乎没有实际的影响,就相当于关闭了心跳机制。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。