前言
RabbitMQ是使用Erlang语言开发的消息中间件,符合高级消息队列协议(AMQP )。
与Kafka等消息队列相比,RabbitMQ的最大优势是高可靠性:
提供确认(ACK )和重传机制,确保消息消耗完成,消费者不会意外丢失消息
提供消息持久化机制,确保broker崩溃时不会丢失消息
以集群模式运行,确保高可用性
由于高可靠性和一致性,RabbitMQ可以应对订单处理、秒杀等要求一致性的业务场景。
RabbitMQ的概念和机制
RabbitMQ的概念模型:
Broker:消息中间件实例可以是在单个节点或多节点群集上运行的逻辑实体
消息(消息) :消息由两部分组成:消息标头和消息主体。 标头包括标准标头(如路由密钥和优先级)和其他定义RabbitMQ消息行为的自定义标头。 消息主体是字节流,包含消息的内容。
连接:客户端和Broker之间的TCP连接
通道:通道是为TCP连接建立的逻辑(虚拟)连接。 通过多个Channel复用同一TCP连接,避免了建立TCP连接的巨大开销。 RabbitMQ正式要求每个线程使用独立的通道,禁止多个线程共享通道。
生产者:向其发送消息的客户端线程
处理“消费者”:消息的客户端线程
交换机(Exchange ) :交换机负责将消息传递到相应的队列
队列:接收并存储交换机发送的消息,直到消费者正常消费为止。 结构遵循先进的FIFO。
绑定:向交换机的路由表中注册队列
可以在虚拟主机(Vhost ) :每个中介下构建多个Vhost,每个Vhost可以构建独立的Exchange、Queue、绑定和权限系统。 同一Broker下的vhost共享连接、通道和用户系统。 这意味着可以以相同的用户id使用相同的Channel访问不同的vhost。
交换机(Exchange )
生产者发送的消息首先发送到交换机(Exchange ),交换机根据自己的类型和消息的路由密钥等信息将消息传递到绑定的消息队列。
RabbitMQ的4个标准交换机:
如果direct:消息的路由密钥与队列的绑定密钥完全相同,则direct类型的交换机将消息传递到队列。
多个队列可以使用同一绑定密钥绑定到同一direct交换机。 direct交换机将消息传递到绑定密钥与消息路由密钥相同的所有队列
在topic:中,队列绑定密钥可以包含通配符*和#。 主题交换机将消息传递到绑定密钥和路由密钥匹配的队列。
通配符与关键字匹配。 例如,news.cn.a的关键字是news、cn和a。 也就是说,关键字按.分割
#通配符匹配0个或多个关键字,news.#.a匹配news.a、news.cn.a、xqdz等
*通配符与关键字匹配,news.*.a与news.cn.a匹配,与news.a、xqdz不匹配
fanout: fanout交换机不执行任何匹配,而是将消息传递到所有绑定的队列
header: header交换机是基于标头分发的,目前很少使用
您可以使用RabbitMQ插件机制来使用第三方交换机,也可以使用自行开发的交换机。 例如,实现延迟配送的延迟消息交换。
消息标头的delivery-mode可以设置为持久性或transient。 在处理持久化消息时,Exchange和Queue将消息写入磁盘,以便在RabbitMQ崩溃时不会丢失它们,然后执行以下操作:
消费者客户端通常使用的channel.basicConsume使用推送模式传递消息。 也就是说,如果有新消息,Broker会主动通过channel向客户端发送消息。 客户端还可以使用channel.basicGet从中介中提取消息。
ACK机制
RabbitMQ提供了一种确认机制,可以确保正确处理和丢失消息。
送货确认书有三种:
已成功处理ACK:消息
NACK:消息处理异常,需要重新发送
REJECT:邮件无效,将销毁该邮件
RabbitMQ的Queue可以设置为no_ack=true
, 则消息被投递后即删除不等待回执。channel.basicConsume 可以指定auto_ack模式,若auto_ack=true当客户端收到完整消息后即会自动发出ACK回执,否则必须显式的发出回执。
Java 代码示例
首先安装并启动RabbitMQ实例, Mac用户可以使用 Homebrew 进行安装:
brew install rabbitmq
启动服务:
brew services start rabbitmq
或者使用官方docker镜像:
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management
RabbitMQ官网提供了Ubuntu、RPM以及Windows等多种平台安装方式。
RabbitMQ默认TCP端口为5672, Web控制台默认端口15672。
在Maven中添加依赖:
com.rabbitmq
amqp-client
5.5.1
编写生产者:
package rabbit;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author finley
*/
public class RabbitProducer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()) {
String exchangeName = "test-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hello";
byte[] msg = "hello world".getBytes();
AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
propsBuilder.deliveryMode(2); // persistent
propsBuilder.priority(0); // normal
propsBuilder.contentType("text/plain");
channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg);
}
}
}
编写消费者:
package rabbit;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
/**
* @author finley
*/
public class RabbitConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()) {
String exchangeName = "test-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
String bindingKey = "hello";
channel.queueBind(queueName, exchangeName, bindingKey);
while(true) {
channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
String bodyStr = new String(body, "UTF-8");
System.out.println("routingKey: " + routingKey + ", contentType: " + contentType + ", body: " + bodyStr);
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
});
}
}
}
}
RabbitMQ 的消息为字节, 可以将 Java 对象序列化后作为消息体发送。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。