首页 > 编程知识 正文

rabbitmq延迟队列最大时间,rabbitmq 延时消息

时间:2023-05-05 20:20:43 阅读:195674 作者:1765

过期时间TTL

TTL,Time to Live的简称,即过期时间。RabbitMQ 可以对消息和队列设置TTL。

设置消息的TTL:

目前有两种方法可以设置消息的TTL。

第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。

如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准。
消息在队列中的生存时间一旦超过设置的TTL值时,就会变成“死信”(DeadMessage),消费者将无法再收到该消息(这点不是绝对的)。

通过队列属性设置消息TTL的方法是在channel.queueDeclare 方法中加入x-message-ttl参数实现的,这个参数的单位是毫秒。

//设置TTLMap<String, Object> argss = new HashMap<>();argss.put("x-message-ttl", 6000);channel.queueDeclare(QUEUE_NAME, true, false, false, argss);

同时也可以通过Pllicy的方式来设置TTL:

rabbitmqctl set_policy TTL ".*" '{"message-ttl"":60000}' --apply-to queues

如果不设置TTL,则表示此消息不会过期;
如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃,这个特性可以部分替代RabbitMQ 3.0版本之前的immediate 参数,之所以部分代替,是因为immediate参数在投递失败时会用Basic. Return将消息返回(这个功能可以用死信队列来实现)

针对每条消息设置TTL的方法是在channel.basicPublish方法中加入expiration的属性参数,单位为毫秒:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();builder.deliveryMode(2);//持久化消息builder.expiration("60000");//设置TTL=60000msAMQP.BasicProperties properties = builder.build();channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, false, properties, "ttlTest".getBytes());

对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。

为什么这两种方法处理的方式不一样?因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可。而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。

设置队列的TTL:
通过channel. queueDeclare方法中的x-expires参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过Basic.Get命令。

设置队列里的TTL可以应用于类似RPC方式的回复队列,在RPC中,许多队列会被创建出来,但是却是未被使用的。
RabbitMQ会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时。在RabbitMQ重启后,持久化的队列的过期时间会被重新计算。

用于表示过期时间的x-expires参数以毫秒为单位,并且服从和x-message-ttl一样的约束条件,不过不能设置为0。比如该参数设置为1000,则表示该队列如果在1秒钟之内未使用则会被删除。

//设置队列TTLMap<String, Object> arg = new HashMap<>();arg.put("x-expires", 1800000);channel.queueDeclare("myqueue", false, false, false, arg); 死信队列

DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信( dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

消息变成死信一般是由于以下几种情况:

消息被拒绝(Basic. Reject/Basic.Nack),并且设置requeue参数为false;消息过期;队列达到最大长度。

DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息以进行相应的处理,这个特性与将消息的TTL设置为0配合使用可以弥补immediate参数的功能。

通过在channel. queueDeclare方法中设置x-dead- letter-exchange参数来为这个队列添加DLX

//设置死信队列channel.exchangeDeclare("dlx_exchange", "direct");//创建DLXMap<String, Object> arg = new HashMap<>();arg.put("x-dead-letter-exchange", "dlx_exchange");//为队列添加myqueue添加DLXchannel.queueDeclare("myqueue", false, false, false, arg);

也可以为这个DLX指定路邮键,如果没有特殊指定,则使用原队列的路由键:

arg.put("x-dead-letter-routing-key", "dlx-routing-key");

也可以通过Policy的方式设置:

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"dlx_exchange"}' --apply-to queues

示例:创建一个队列,为其设置TTL和DLX:

channel.exchangeDeclare("exchange.dlx", "direct", true);channel.exchangeDeclare("exchange.normal", "fanout", true);Map<String, Object> arg = new HashMap<>();arg.put("x-message-ttl", 10000);arg.put("x-dead-letter-exchange", "exchange.dlx");arg.put("x-dead-letter-routing-key", "routingkey");channel.queueDeclare("queue.normal", true, false, false, arg);channel.queueBind("queue.normal", "exchange.normal", "");channel.queueDeclare("queue.dlx", true, false, false, null);channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());

这里创建了两个交换器exchange.normal 和exchange.dlx, 分别绑定两个队列queue.normal和queue.dlx.

由Web管理页面可以看出,两个队列都被标记了“D”,这个是durable的缩写,即设置了队列持久化。queue.normal 这个队列还配置了TTL、DLX和DLK,其中DLX指的是x-dead- letter-routing-key这个属性。

生产者首先发送一条携带路由键为“rk”的消息,然后经过交换器exchange.normal顺利地存储到队列queue.normal中。由于队列queue.normal设置了过期时间为10s,在这10s 内没有消费者消费这条消息,那么判定这条消息为过期。由于设置了DLX,过期之时,消息被丢给交换器exchange.dlx中,这时找到与exchange.dlx 匹配的队列queue.dlx, 最后消息被存储在queue.dlx这个死信队列中。


对于RabbitMQ来说,DLX是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了Basic.Nack或者Basic. Reject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。DLX配合TTL使用还可以实现延迟队列的功能。

延迟队列

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

延迟队列的使用场景有很多,比如:

在订单系统中, 一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延迟队列来处理这些订单了。用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。

在AMQP协议中,或者RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的DLX和TTL模拟出延迟队列的功能。

在上节的图示中,不仅展示的是死信队列的用法,也是延迟队列的用法,对于queue.dlx这个死信队列来说,同样可以看作延迟队列。假设一个应用中需要将每条消息都设置为10秒的延迟,生产者通过exchange.normal这个交换器将发送的消息存储在queue.normal这个队列中。消费者订阅的并非是queue.normal这个队列,而是queue.dlx这个队列。当消息从queue.normal这个队列中过期之后被存入queue.dlx这个队列中,消费者就恰巧消费到了延迟10秒的这条消息。

在真实应用中,对于延迟队列可以根据延迟时间的长短分为多个等级,-般分为5秒、10秒、30秒、1分钟、5分钟、10分钟、30分钟、1小时这几个维度,当然也可以再细化一下。

为了简化说明,这里只设置了5秒、10秒、30秒、1分钟这四个等级。根据应用需求的不同,生产者在发送消息的时候通过设置不同的路由键,以此将消息发送到与交换器绑定的不同的队列中。这里队列分别设置了过期时间为5秒、10秒、30秒、1分钟,同时也分别配置了DLX和相应的死信队列。当相应的消息过期时,就会转存到相应的死信队列(即延迟队列)中,这样消费者根据业务自身的情况,分别选择不同延迟等级的延迟队列进行消费。

优先队列

优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。

可以通过设置队列的x-max-priority参数来实现:

//设置一个队列的最大优先级Map<String, Object> arg = new HashMap<>();arg.put("x-max-priority", 10);channel.queueDeclare("queue.priority", true, false, false, arg);

通过Web管理页面可以看到Pri标志:

上面的代码演示的是如何配置一个队列的最大优先级。在此之后,需要在发送时在消息中设置消息当前的优先级。

//设置消息优先级//设置消息优先级AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();builder.priority(5);AMQP.BasicProperties properties = builder.build();channel.basicPublish("exchange_priority", "rk_priority", properties, "messages".getBytes());

上面的代码中设置消息的优先级为5。默认最低为0,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,这个也是有前提的:如果在消费者的消费速度大于生产者的速度且Broker中没有消息堆积的情况下,对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消 息就被消费者消费了,那么就相当于Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。