首页 > 编程知识 正文

rabbitmq死信队列作用,rabbitmq死信队列原理

时间:2023-05-05 01:13:05 阅读:195663 作者:94

消息变成死信无非是以下几种情况: 消息被拒绝签收(Nack),并且不允许重回队列。TTL设定的消息有效时间过期。实际消息数大于队列最大限制数,那么超出最大限制的消息都将会是死信。 那么,出现了死信,应该如何处理呢?

其实,我们可以把这些死信放到自定义的死信队列里去。

什么叫做死信队列?

“死信队列”,顾名思义,就是存放死信的队列。其实它和普通的队列并没有太大差别,唯一的区别就是他的routingkey是"#"。也就是说:只要你路由到我这个死信队列,我都接收。

如何定义死信队列? 生产端: package com.wy.testrabbitmq.dlx;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * @author wangyan@163.com * @version 1.0 * @date 2019-06-12 17:20 */public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_dlx_exchange"; String routingkey = "dlx.dlx"; String msg = "test dlx message"; for (int i = 0; i < 3; i++) { // deliveryMode=2 持久化,expiration 消息有效时间 AMQP.BasicProperties properties=new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("utf-8") .expiration("7000") .build(); channel.basicPublish(exchangeName, routingkey, true, properties, msg.getBytes()); } }} 消费端:(主要处理在消费端) package com.wy.testrabbitmq.dlx;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.util.HashMap;import java.util.Map;/** * @author wangyan@163.com * @version 1.0 * @date 2019-06-12 17:21 */public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_dlx_exchange"; String routingkey = "dlx.#"; String queueName = "test_dlx_queueName"; Map<String,Object>map =new HashMap<>(); //注意: x-dead-letter-exchange 这个key是固定这样写的,value是你自定义的。 map.put("x-dead-letter-exchange","dlx.exchange"); //声明队列 channel.exchangeDeclare(exchangeName, "topic", true, false, null); //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。 channel.queueDeclare(queueName, true, false, false, map); channel.queueBind(queueName, exchangeName, routingkey); //死信队列声明 channel.exchangeDeclare("dlx.exchange","topic",true,false,null); channel.queueDeclare("dlx.queue",true,false,false,null); //routingkey指定为#就行,表示只要路由到死信队列的都接收 channel.queueBind("dlx.queue","dlx.exchange","#"); channel.basicConsume(queueName, true, new TestConsumer(channel)); }} 测试:

如上代码中,我定义了消息的有效时间为7秒。按照逻辑,推送的消息超过7秒没有被消费,就会变成死信。下面我只启动生产端,看管控台变化。

如下,刚启动时我们可以看到我的test_dlx_queueName中有3条消息。

等待7秒后如下:

我们可以看到,由于超过了TTL的有效时间,这3条消息变成了死信,被自动转移到了我们自定义的dlx.queue当中。

注意这些地方:

//注意: x-dead-letter-exchange 这个key是固定这样写的,value是你自定义的。
Map<String,Object>map =new HashMap<>();
map.put(“x-dead-letter-exchange”,“dlx.exchange”);

//注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
channel.queueDeclare(queueName, true, false, false, map);

// routingkey指定为#就行,表示只要路由到死信队列的都接收
channel.queueBind(“dlx.queue”,“dlx.exchange”,"#");

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。