首页 > 编程知识 正文

rabbitmq队列持久化和消息持久化,死信队列和延时队列

时间:2023-05-04 09:49:41 阅读:195673 作者:3193

文章目录 1. 什么是死信队列2. 代码示例3. 使用死信队列实现延时队列


1. 什么是死信队列

        就是在队列中的消息如果没有消费者消费,那么该消息就成为一个死信。如果这个消息被重新发送到另外一个交换机(exchange)上的话, 那么后面这个交换机(exhcange)就是死信队列。

        死信队列也是一个正常的交换机exchange和队列queue,也会通过routingkey 绑定到具体的队列上。

什么样的消息会移交死信队列?

消息被拒绝,并且重回队列(requeue)的值为false, 表示不需要要重回队列 //消费端拒绝签收,并且不支持重回队列,那么该条消息就是一条死信消息channel.basicNack(envelope.getDeliveryTag(),false,false);//拒接消息channel.basicReject(deliveryTag,false); 消息过期,消息本身设置了过期时间,或者队列设置了消息过期时间x-message-ttl //消息属性:10秒没有被消费,那么就会转到死信队列上AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .deliveryMode(2) .expiration("10000") .build(); 队列达到最大长度:比如队列最大长度是3000 ,那么3001消息就会被送到死信队列上 //x-max-length:正常交换机的最大消息数 4个 queueArgs.put("x-max-length",4);


2. 代码示例

生产者:

public class DlxRabbitmqProducer { public static void main(String[] args) throws IOException, TimeoutException { //设置连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.159.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setVirtualHost("zhb"); connectionFactory.setPassword("guest"); connectionFactory.setConnectionTimeout(100000); //获取连接 Connection connection = connectionFactory.newConnection(); //获取一个channel Channel channel = connection.createChannel(); //消息十秒没有被消费,那么就会转到死信队列上 AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .deliveryMode(2) .expiration("10000") .build(); //声明正常的队列 String nomalExchangeName = "nomaldlx.exchange"; String routingKey = "dlx.key1"; String message = "我是测试的死信消息"; //发送100个信息 for(int i=0;i<100;i++) {channel.basicPublish(nomalExchangeName,routingKey,basicProperties,message.getBytes()); } }}

消费者

public class DlxRabbitmqConsumer { public static void main(String[] args) throws IOException, TimeoutException { //设置连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.159.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setVirtualHost("zhb"); connectionFactory.setPassword("guest"); connectionFactory.setConnectionTimeout(100000); //获取连接 Connection connection = connectionFactory.newConnection(); //获取一个channel Channel channel = connection.createChannel(); //正常的交换机和队列名称 String nomalExchangeName = "nomaldlx.exchange"; String exchangeType = "topic"; String nomalqueueName = "nomaldex.queue"; String routingKey = "dlx.#"; //声名正常的交换机 channel.exchangeDeclare(nomalExchangeName,exchangeType,true,false,null); //死信交换机名称 String dlxExhcangeName = "dlx.exchange"; //死信队列名称 String dlxQueueName = "dlx.queue"; //声明死信交换机 channel.exchangeDeclare(dlxExhcangeName,exchangeType,true,false,null); //声名死信队列 channel.queueDeclare(dlxQueueName,true,false,false,null); //死信交换机和死信队列绑定 channel.queueBind(dlxQueueName,dlxExhcangeName,"#"); //把死信交换机信息 封装成map参数 Map<String,Object> queueArgs = new HashMap<>(); //x-dead-letter-exchange:如果正常交换机执行不了,就往死信交换机上转 queueArgs.put("x-dead-letter-exchange",dlxExhcangeName); //x-max-length:正常交换机的最大消息数 4个 queueArgs.put("x-max-length",4); //声名正常的队列 //封装后的死信交换机 作为参数 绑定到正常的队列上 channel.queueDeclare(nomalqueueName,true,false,false,queueArgs); //正常的交换机和正常的队列(已具有死信)绑定! channel.queueBind(nomalqueueName,nomalExchangeName,routingKey); //自定义DlxConsumer消费者消费消息 channel.basicConsume(nomalqueueName,false,new DlxConsumer(channel)); }}

自定义DlxConsumer消费者

public class DlxConsumer extends DefaultConsumer { private Channel channel; public DlxConsumer(Channel channel) { super(channel); this.channel = channel; } public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException { System.out.println("接受到消息:"+new String(body)); //消费端拒绝签收,并且不支持重回队列,那么该条消息就是一条死信消息 //channel.basicNack(envelope.getDeliveryTag(),false,false); //消费端签收消息 channel.basicAck(envelope.getDeliveryTag(),false); }}


3. 使用死信队列实现延时队列

在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

        TTL 全称 Time To Live(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除,如过绑定了死信队列,已过期的消息则会进入死信队列中!

        RabbitMQ可以通过消息属性BasicProperties中的expiration对消息设置过期时间,也可以通过x-message-ttl 属性对整个队列设置过期时间,使整个队列统一过期。过期时间都是毫秒ms。如果两者都进行了设置,则以时间短的为准。

场景:订单超过10分钟未支付,则自动关闭

思路:

把订单消息设置10分钟过期,发送到正常队列Q1中正常队列Q1绑定死信队列Q2,不要设置消费者监听正常队列Q110分钟过后,Q1中的消息过期,通过绑定进入Q2死信队列中,设置消费者监听死信队列Q2,如果有进来订单消息,则说明此订单已超过10分钟未支付,则删除此订单!

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。