消息进入死信队列的条件
1.消息到达超时时间
2.队列长度到达限制
3.消息拒绝签收,不把消息放入原队列中
1.RabbitMQ配置类,队列消息过期时间设置5秒
@Configurationpublic class DirectRabbitConfig { //死信队列 @Bean("deadLetterQueue") public Queue deadLetterQueue() { return new Queue("TDL_QUEUE"); } //死信交换机 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return ExchangeBuilder.directExchange("TDL_EXCHANGE").durable(true).build(); } //绑定死信队列和死信交换机 @Bean public Binding deadLetterBinding(Queue deadLetterQueue,DirectExchange deadLetterExchange) { return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("TDL_KEY"); } //队列 @Bean public Queue directQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 5000); // 队列中的消息未被消费则5秒后过期 map.put("x-dead-letter-exchange", "TDL_EXCHANGE");// x-dead-letter-routing-key 声明 死信队列抛出异常重定向队列的routingKey(TKEY_R) map.put("x-dead-letter-routing-key", "TDL_KEY"); return new Queue("directQueue", true, false, false, map); } //Direct交换机 @Bean DirectExchange directExchange() { return new DirectExchange("directExchange"); } //绑定 将队列和交换机绑定 @Bean Binding bindingDirect(Queue directQueue, DirectExchange directExchange) { return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001"); }}2.发送消息
@Controllerpublic class SendController { @Autowired RabbitTemplate rabbitTemplate; @RequestMapping("sendMessage") @ResponseBody public String sendMessage(String message){ rabbitTemplate.convertAndSend("directExchange", "routingkey001", message); return "ok"; }}这里消息没有接收,5秒后超时,消息进入死信队列
1.修改RabbitMQ配置类,队列长度设置为5
@Configurationpublic class DirectRabbitConfig { //死信队列 @Bean("deadLetterQueue") public Queue deadLetterQueue() { return new Queue("TDL_QUEUE"); } //死信交换机 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return ExchangeBuilder.directExchange("TDL_EXCHANGE").durable(true).build(); } //绑定死信队列和死信交换机 @Bean public Binding deadLetterBinding(Queue deadLetterQueue,DirectExchange deadLetterExchange) { return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("TDL_KEY"); } //队列 @Bean public Queue directQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-max-length", 5); //最多存入5个消息 map.put("x-dead-letter-exchange", "TDL_EXCHANGE");// x-dead-letter-routing-key 声明 死信队列抛出异常重定向队列的routingKey(TKEY_R) map.put("x-dead-letter-routing-key", "TDL_KEY"); return new Queue("directQueue", true, false, false, map); } //Direct交换机 @Bean DirectExchange directExchange() { return new DirectExchange("directExchange"); } //绑定 将队列和交换机绑定 @Bean Binding bindingDirect(Queue directQueue, DirectExchange directExchange) { return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001"); }}这里队列中消息数量超过限制后,多余的消息进入死信队列中
发送端代码
1.RabbitMQ配置类
@Configurationpublic class DirectRabbitConfig { //死信队列 @Bean("deadLetterQueue") public Queue deadLetterQueue() { return new Queue("TDL_QUEUE"); } //死信交换机 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return ExchangeBuilder.directExchange("TDL_EXCHANGE").durable(true).build(); } //绑定死信队列和死信交换机 @Bean public Binding deadLetterBinding(Queue deadLetterQueue,DirectExchange deadLetterExchange) { return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("TDL_KEY"); } //队列 @Bean public Queue directQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange", "TDL_EXCHANGE");// x-dead-letter-routing-key 声明 死信队列抛出异常重定向队列的routingKey(TKEY_R) map.put("x-dead-letter-routing-key", "TDL_KEY"); return new Queue("directQueue", true, false, false, map); } //Direct交换机 @Bean DirectExchange directExchange() { return new DirectExchange("directExchange"); } //绑定 将队列和交换机绑定 @Bean Binding bindingDirect(Queue directQueue, DirectExchange directExchange) { return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001"); }}2.发送消息
@Controllerpublic class SendController { @Autowired RabbitTemplate rabbitTemplate; @RequestMapping("sendMessage") @ResponseBody public String sendMessage(String message){ Message message1 = new Message(message.getBytes()); rabbitTemplate.convertAndSend("directExchange", "routingkey001", message1); return "ok"; }}接收端代码
1.配置文件,启动手动签收
2.RabbitMQ配置类
@Configurationpublic class DirectRabbitConfig { //死信队列 @Bean("deadLetterQueue") public Queue deadLetterQueue() { return new Queue("TDL_QUEUE"); } //死信交换机 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return ExchangeBuilder.directExchange("TDL_EXCHANGE").durable(true).build(); } //绑定死信队列和死信交换机 @Bean public Binding deadLetterBinding(Queue deadLetterQueue,DirectExchange deadLetterExchange) { return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("TDL_KEY"); } //队列 @Bean public Queue directQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange", "TDL_EXCHANGE");// x-dead-letter-routing-key 声明 死信队列抛出异常重定向队列的routingKey(TKEY_R) map.put("x-dead-letter-routing-key", "TDL_KEY"); return new Queue("directQueue", true, false, false, map); } //Direct交换机 @Bean DirectExchange directExchange() { return new DirectExchange("directExchange"); } //绑定 将队列和交换机绑定 @Bean Binding bindingDirect(Queue directQueue, DirectExchange directExchange) { return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001"); }}3.接收消息
@Componentpublic class ReceiveMessage { @RabbitListener(queues = "directQueue")//监听的队列名称 directQueue public void process(Message message,Channel channel) throws IOException { //消息编号 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ System.out.println("directReceiver消费者收到消息 : " + message); int a=10/0;//制造异常 channel.basicAck(deliveryTag,true);//手动签收 }catch (Exception e){ //参数3:true将消息放回原来队列中,false不把消息放入原队列中 channel.basicNack(deliveryTag,true,false); } }}消费消息异常后,没有签收消息,将消息不放回原有队列,这时消息被放入到私信队列中