首页 > 编程知识 正文

rabbitmq项目实战,rabbitmq的工作模式

时间:2023-05-06 15:46:08 阅读:32433 作者:2618

首先,最重要的是构想。 SpringBoot集成RabbitMQ分为生产者工序和消费者工序,分为以下几个生产者工序

1 )添加RabbitMQ的启动依赖关系

2 )在application.yml中配置RabbitMQ的信息

3 )创建rabbitMQ设置类

4 )做生产者

消费者工程

1 )添加RabbitMQ的启动依赖关系

2 )在application.yml中配置RabbitMQ的信息

3 )创建rabbitMQ设置类

4 )消费者工程

生产者工序开始1 )添加RabbitMQ的启动依赖

ependencygroupidorg.spring framework.boot/groupidartifactidspring-boot-starter-amqp/artifact id/dependency2)

server : port :8081 spring : application : name : test-rabbit MQ-producerrabbbitmq : host 3:127.0.0.0.1 pororor ost:/3 )创建用于配置Exchange、Queue和绑定交换机的rabbitMQ配置类。

在本例中配置Topic开关。

package com.example.config; importorg.spring帧work.amqp.core.*; importorg.spring framework.beans.factory.annotation.qualifier; importorg.spring帧work.context.annotation.bean; importorg.spring framework.context.annotation.configuration;/* * @ classnamerabbitmqconfig * @ description todo * @ authorkkdbl * @ date 2019/12/1712:35 * @ 1.0 */@ configurationpublicclassrabbitmqconfig { publicstaticfinalstringqueue _ inform _ email=' queue _ inform _ email '; publicstaticfinalstringqueue _ inform _ SMS=' queue _ inform _ SMS '; publicstaticfinalstringexchange _ topics _ inform=' exchange _ topics _ inform '; publicstaticfinalstringroutingkey _ email=' inform.#.email.# '; publicstaticfinalstringroutingkey _ SMS=' inform.#.SMS.# '; //声明开关@bean(exchange_topics_inform ) publicexchangeexchange _ topics _ inform ) (/durable ) (true ) )是永久的mq重新启动后QUEUE_INFORM_EMAIL队列@bean(queue_inform_email ) publicqueue_inform_email ) {returnnewqueue} 声明的QUEUE_INFORM_SMS队列@bean(queue_inform_SMS ) public Queue QUEUE_INFORM_SMS ) { return new queue } queue 指定路由密钥@ beanpublicbindingbinding _ queue _ inform _ email @ qualifier (queue _ in fier )

e){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } //ROUTINGKEY_SMS队列绑定交换机,指定routingKey @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); }}

4)创建生产者

使用RarbbitTemplate发送消息
 

package com.example;import com.example.config.RabbitmqConfig;import org.junit.jupiter.api.Test;import org.junit.runner.RunWith;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest@RunWith(SpringRunner.class)class ProdcerTopicsSpringbootApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void Producer_topics_springbootTest() { //使用rabbitTemplate发送消息 String message = "send email message to user"; /** * 参数: * 1、交换机名称 * 2、routingKey * 3、消息内容 */ rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.email", message); }} 生产者工程结束

 

消费者工程开始

    1.)起步依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2.)配置application.yml

server: port: 8082spring: application: name: test-rabbitmq-consumer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtualHost: /

3.)创建一个rabbitMQ配置类

package com.example.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @ClassName RabbitmqConfig * @Description TODO * @Author kkdbl * @Date 2019/12/17 12:35 * @Version 1.0 */@Configurationpublic class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static final String ROUTINGKEY_SMS="inform.#.sms.#"; //声明交换机 @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //声明QUEUE_INFORM_EMAIL队列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(QUEUE_INFORM_EMAIL); } //声明QUEUE_INFORM_SMS队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } //ROUTINGKEY_SMS队列绑定交换机,指定routingKey @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); }}

4)监听队列,接收消息

package com.example.controller;import com.example.config.RabbitmqConfig;import com.rabbitmq.client.Channel;import com.rabbitmq.client.impl.AMQImpl;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.boot.autoconfigure.amqp.RabbitProperties;import org.springframework.stereotype.Component;/** * @ClassName ReceiveHandler * @Description TODO * @Author kkdbl * @Date 2019/12/17 13:02 * @Version 1.0 */@Componentpublic class ReceiveHandler { //监听email队列 @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL}) public void receive_email(Object msg, Message message, Channel channel){ System.out.println("QUEUE_INFORM_EMAIL msg"+msg); } //监听sms队列 @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS}) public void receive_sms(Object msg, Message message, Channel channel){ System.out.println("QUEUE_INFORM_SMS msg"+msg); }}

 

 启动生产者:

启动成功,

 

 启动消费者:查看信息

以上是SpringBoot整合RabbitMQ全部过程

 

RabbitMQ超级详解:  https://blog.csdn.net/huzecom/article/details/103499692

 

 

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。