首页 > 编程知识 正文

rabbitmq死信队列,rabbitmq队列持久化和消息持久化

时间:2023-05-04 23:13:00 阅读:195677 作者:1227

什么是死信队列(DLX)

死信队列又被称为延迟队列、延时队列,也是RabbitMq队列中的一种,利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange, 这个Exchange就是DLX。

死信队列能在任何的队列上被指定,实际上就是设置某个队列的属性,标识这个队列是否需要死信队列的存在。当这个队列中有死信时,RabbitMQ就会 自动的将这个消息重新发布到设置的Exchange.上去,进而被路由到另一个队列。

我们来模拟一个场景,比如12306抢票,当用户抢到火车票时,12306官方会提示用户“请在30分钟内付款”,由于种种原因,用户迟迟没有付款,过了30分钟后仍然没有支付,系统自动取消该笔订单。

对于这种业务,在我们的实际生活中是非常常见的,在以前传统的处理方式可以是采用一个定时器,定时去获取没有付款的订单,并判断用户的下单时间距离当前的时间是否已经超过30分钟,如果是,表示用户在30分钟内内有付款,系统将自动失效该笔订单并回收该车票。但是这种方式存在非常大的隐患,我们知道,春运抢票是一个大数据量、高并发请求的场景,再用户抢到票到支付这段时间内,如果定时器频繁的从数据库中获取“未付款”状态的订单,其数据量之大难以想象,并且大批用户在30分钟内迟迟不付款,从数据库中获取的数据量将一直在增长,当到达一定程度时,将给数据库服务器和应用服务器带来巨大的压力。

而消息中间件rabbitmq的引入,将大大改善上面的情况,其流程如下图:

死信队列代替了原来定时器的逻辑,私信队列/延迟队列可以实现特定的消息或业务数据等待一定的时间TTL后,再被消费者监听消费处理。

优势

1、占用系统资源少
不需要在伦旭数据库获取数据,减少DB层面资源的消耗
2、人为干预少
只需要搭建好死信队列的消息模型就可以不需要在去干预了
3、自动消费处理
当制定的延迟时间一到,消息就自动被路由到实际的队列进行处理

结构介绍

与普通的队列相比,死信队列同样具有消息、交换机、路由和队列等转悠级名次,只不过在死信队列里增加了另外三个成员:
DLX(Dead Letter Exchange):死信交换机,就是交换机的一种类型,知识属于特殊的类型。
DLK(Dead Letter Routing-key):死信路由,同样也是一种特殊的路由,主要是跟DLX组合在一起组成死信队列。
TTL(Time To Live):指进入死信队列中的消息可以存活的时间。

其中DLX跟DLK是必须的成员,而TTL则是可选、非必须的。



流程:
消息到达第一个中转站,即死信队列,由基本交换机和基本路由绑定,并对应到指定的死信队列,因而消息将进入第一个暂存区,即死信队列中,而死信队列不同于一般的普通队列,它由三大部分组成,当消息进入死信队列时,TTL便开始进入倒计时,当存活时间一到,消息将进入第二个中转站,即真正的消息模型中的死信交换机。由于死信交换机和死信路由绑定,并对应到指定的真正的 队列,因而此时消息将不做停留,而是直接被路由到第二个暂存 区,即真正的队列中,最终该消息被真正的队列对应的消费者监听。至此,消息才完成满城的旅行。

死信结构的出现

1、消息被拒绝并且不再重新投递,即requeue参数的取值为false
即虽然消息消费失败,但是又不想让消息重新回到队列中,利用死信队列可以对消息进行一个处理。
2、消息超过了指定的存活时间TTL
3、队列达到最大长度了

实例

死信队列的应用,首先我们需要设置死信队列的exchange和queue,然后进行绑定。然后在我们正常声明普通的交换机、队列、绑定后,在队列上加上一个参数即可:

arguments.put("x-dead-letter-exchange","dlx.exchange");

这样,当消息过期、requeue、队列在达到最大长度是,消息就可以直接路由到死信队列。

生产者

public class Producer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.237.139");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_dlx_exchange";String routingKey = "dlx.save";String msg = "Hello RabbitMQ DLX Message";for(int i =0; i<1; i ++){AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").expiration("10000") //设置消息过期时间.build();channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());}}}

消费者

public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.237.139");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//要进行死信队列中交换机、队列的声明和绑定:channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);channel.queueDeclare("dlx.queue", true, false, false, null);channel.queueBind("dlx.queue", "dlx.exchange", "#");// 这就是一个普通的交换机 和 队列 以及路由名String exchangeName = "test_dlx_exchange";String routingKey = "dlx.#";String queueName = "test_dlx_queue";//普通交换机声明channel.exchangeDeclare(exchangeName, "topic", true, false, null);Map<String, Object> agruments = new HashMap<String, Object>();agruments.put("x-dead-letter-exchange", "dlx.exchange");//普通队列声明,但是需要绑定agruments属性channel.queueDeclare(queueName, true, false, false, agruments);//普通交换机与队列的绑定channel.queueBind(queueName, exchangeName, routingKey);channel.basicConsume(queueName, true, new MyConsumer(channel));}}

自定义消费者

public class MyConsumer extends DefaultConsumer {public MyConsumer(Channel channel) {super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));}}

我们先启动消费者进程,保证rabbitmq服务器中存在普通队列和死信队列:

关闭消费者队列,启动生产者队列,我们将看到服务器中的普通队列上存在一条信息

由于我们将普通队列中的消息设置为了10秒过期,所以10秒之后,我们发现普通队列上已经没有了消息,而死信队列上多出了一条消息。证明死信队列中消息获取成功。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。