首先,最重要的是构想。 SpringBoot集成RabbitMQ分为生产者工序和消费者工序,分为以下几个生产者工序
1 )添加RabbitMQ的启动依赖关系
2 )在application.yml中配置RabbitMQ的信息
3 )创建rabbitMQ设置类
4 )做生产者
消费者工程
1 )添加RabbitMQ的启动依赖关系
2 )在application.yml中配置RabbitMQ的信息
3 )创建rabbitMQ设置类
4 )消费者工程
生产者工序开始1 )添加RabbitMQ的启动依赖
ependencygroupidorg.spring framework.boot/groupidartifactidspring-boot-starter-amqp/artifact id/dependency2)
server : port :8081 spring : application : name : test-rabbit MQ-producerrabbbitmq : host 3:127.0.0.0.1 pororor ost:/3 )创建用于配置Exchange、Queue和绑定交换机的rabbitMQ配置类。
在本例中配置Topic开关。
package com.example.config; importorg.spring帧work.amqp.core.*; importorg.spring framework.beans.factory.annotation.qualifier; importorg.spring帧work.context.annotation.bean; importorg.spring framework.context.annotation.configuration;/* * @ classnamerabbitmqconfig * @ description todo * @ authorkkdbl * @ date 2019/12/1712:35 * @ 1.0 */@ configurationpublicclassrabbitmqconfig { publicstaticfinalstringqueue _ inform _ email=' queue _ inform _ email '; publicstaticfinalstringqueue _ inform _ SMS=' queue _ inform _ SMS '; publicstaticfinalstringexchange _ topics _ inform=' exchange _ topics _ inform '; publicstaticfinalstringroutingkey _ email=' inform.#.email.# '; publicstaticfinalstringroutingkey _ SMS=' inform.#.SMS.# '; //声明开关@bean(exchange_topics_inform ) publicexchangeexchange _ topics _ inform ) (/durable ) (true ) )是永久的mq重新启动后QUEUE_INFORM_EMAIL队列@bean(queue_inform_email ) publicqueue_inform_email ) {returnnewqueue} 声明的QUEUE_INFORM_SMS队列@bean(queue_inform_SMS ) public Queue QUEUE_INFORM_SMS ) { return new queue } queue 指定路由密钥@ beanpublicbindingbinding _ queue _ inform _ email @ qualifier (queue _ in fier )
e){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } //ROUTINGKEY_SMS队列绑定交换机,指定routingKey @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); }}4)创建生产者
使用RarbbitTemplate发送消息
消费者工程开始
1.)起步依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.)配置application.yml
server: port: 8082spring: application: name: test-rabbitmq-consumer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtualHost: /3.)创建一个rabbitMQ配置类
package com.example.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @ClassName RabbitmqConfig * @Description TODO * @Author kkdbl * @Date 2019/12/17 12:35 * @Version 1.0 */@Configurationpublic class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static final String ROUTINGKEY_SMS="inform.#.sms.#"; //声明交换机 @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //声明QUEUE_INFORM_EMAIL队列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(QUEUE_INFORM_EMAIL); } //声明QUEUE_INFORM_SMS队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } //ROUTINGKEY_SMS队列绑定交换机,指定routingKey @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); }}4)监听队列,接收消息
package com.example.controller;import com.example.config.RabbitmqConfig;import com.rabbitmq.client.Channel;import com.rabbitmq.client.impl.AMQImpl;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.boot.autoconfigure.amqp.RabbitProperties;import org.springframework.stereotype.Component;/** * @ClassName ReceiveHandler * @Description TODO * @Author kkdbl * @Date 2019/12/17 13:02 * @Version 1.0 */@Componentpublic class ReceiveHandler { //监听email队列 @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL}) public void receive_email(Object msg, Message message, Channel channel){ System.out.println("QUEUE_INFORM_EMAIL msg"+msg); } //监听sms队列 @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS}) public void receive_sms(Object msg, Message message, Channel channel){ System.out.println("QUEUE_INFORM_SMS msg"+msg); }}
启动生产者:
启动成功,
启动消费者:查看信息
以上是SpringBoot整合RabbitMQ全部过程
RabbitMQ超级详解: https://blog.csdn.net/huzecom/article/details/103499692