首页 > 编程知识 正文

rabbitmq工作原理,mqtt和rabbitmq的区别

时间:2023-05-06 09:07:58 阅读:22264 作者:1105

MQ概述

消息队列技术是一种在分布式APP应用之间交换信息的技术。

消息队列可以驻留在内存或磁盘中,并存储消息,直到将其加载到APP应用程序中。

消息队列允许APP应用程序独立执行。 不需要知道彼此的位置,也不需要在继续执行之前等待接收程序接收此消息。

M问的主要作用是接受和转发消息。 想想生活中的一个场景。 当tldfj把信投入信箱时,邮递员最终会把信寄给收件人。 我们可以把MQ比作邮局和邮递员。

MQ和邮局的主要区别是不处理消息。 但是,接收数据,保存消息数据,传输消息。

RabbitMQ术语

消息只能存储在“队列”中。 消息在rabbitMQ和APP应用程序之间传播,但队列位于rabbitMQ内部。

一个队列没有任何限制,可以保存想要保存的消息量。 那本质上是无限的缓冲区。

多个生产者可以将消息传递到同一队列,并且多个消费者可以尝试从同一消息队列接收数据。

一个队列如下所示。 (上面是其队列名称)。

注意:

生产者、消费者和中间件不需要在一台机器上,在实际的APP应用中大部分也不一样。 可以用图表示RabbitMQ的结构:

使用RabbitMQ解决多线程写入文件问题

分析

以多线程方式写入并生成消息的是程序(生产者p ),而消耗消息的也是消息,其模型应该如下

创建控制台APP应用程序并浏览nuget软件包rabbitMQ.client(4.1.3)

生产者

首先,创建通过套接字连接与服务器连接的连接。 需要传输目标服务器的IP、用户名、密码等。

然后创建通道。 这几乎是所有的事情。

必须声明队列才能发送消息。 然后,可以在队列中发布消息。

执行一次BasicPublish方法以推送消息。

class program { staticvoidmain (string [ ] args ) ) new thread (write ).Start ); newthread(write ).Start ); newthread(write ).Start ); }公共静态语音写入() var factory=newconnectionfactory ) { HostName='localhost ',UserName='eric ', passwor IC ' using (var connection=factory.create connection () using ) varchannel=connection.create model ) Chanel for(intI=0; i 8000; I({stringmessage=I.tostring ); var body=encoding.utf8.getbytes (message ); channel.basic publish (exchange : ',routingKey: 'writeLog ',basicProperties: null,body: body ); console.writeline (' program sent {0} ',消息); } }消费者

当队列中有消息时,消费者必须能够随时从队列中检索消息,因此必须继续运行以接收消息。

正如我们需要打篮球传球,提前确定要传递的队友的位置一样,生产者要发送消息,就必须提前知道正确的消费消息程序列是哪一列。 所以,在运行生产者程序之前,必须启动消费者程序。

由此可见,宣布排队应该在消费者计划中完成。

接下来,创建两个客户端控制台程序

类程序{ publicstaticvoidmain () var factory=newconnectionfactory ) ) { HostName='localhost ',UserName='eric

}; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); ExcuateWriteFile(message); Console.WriteLine(" Receiver Received {0}", message); }; channel.BasicConsume(queue: "writeLog", noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } public static void ExcuateWriteFile(string i) { using (FileStream fs = new FileStream(@"d:\test.txt", FileMode.Append)) { using (StreamWriter sw = new StreamWriter(fs, Encoding.Unicode)) { sw.Write(i); } } } }

执行程序
先执行 消费者程序,让它一直保持监听。

错误解决
执行时VS报错:

“RabbitMQ.Client.Exceptions.BrokerUnreachableException”类型的未经处理的异常在 RabbitMQ.Client.dll 中发生 其他信息: None of the specified endpoints were reachable。

进入查看详细的内部异常:

innerEception:{“The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=530, text=“NOT_ALLOWED - access to vhost ‘/’ refused for user ‘eric’”, classId=10, methodId=40, cause=”}

此时,我们打开在http://localhost:15672/#/users 可以看到eric 下 的Can access virtual hosts 为 NoAccess

解决方法 cd到rabbitmqsbin目录,控制台输入

rabbitmqctl set_permissions -p / userName "." "." ".*"


再次执行时,可以看到:


再次运行生产端跟消费端

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。