MQ概述
消息队列技术是一种在分布式APP应用之间交换信息的技术。
消息队列可以驻留在内存或磁盘中,并存储消息,直到将其加载到APP应用程序中。
消息队列允许APP应用程序独立执行。 不需要知道彼此的位置,也不需要在继续执行之前等待接收程序接收此消息。
M问的主要作用是接受和转发消息。 想想生活中的一个场景。 当tldfj把信投入信箱时,邮递员最终会把信寄给收件人。 我们可以把MQ比作邮局和邮递员。
MQ和邮局的主要区别是不处理消息。 但是,接收数据,保存消息数据,传输消息。
RabbitMQ术语
消息只能存储在“队列”中。 消息在rabbitMQ和APP应用程序之间传播,但队列位于rabbitMQ内部。
一个队列没有任何限制,可以保存想要保存的消息量。 那本质上是无限的缓冲区。
多个生产者可以将消息传递到同一队列,并且多个消费者可以尝试从同一消息队列接收数据。
一个队列如下所示。 (上面是其队列名称)。
注意:
生产者、消费者和中间件不需要在一台机器上,在实际的APP应用中大部分也不一样。 可以用图表示RabbitMQ的结构:
使用RabbitMQ解决多线程写入文件问题
分析
以多线程方式写入并生成消息的是程序(生产者p ),而消耗消息的也是消息,其模型应该如下
创建控制台APP应用程序并浏览nuget软件包rabbitMQ.client(4.1.3)
生产者
首先,创建通过套接字连接与服务器连接的连接。 需要传输目标服务器的IP、用户名、密码等。
然后创建通道。 这几乎是所有的事情。
必须声明队列才能发送消息。 然后,可以在队列中发布消息。
执行一次BasicPublish方法以推送消息。
class program { staticvoidmain (string [ ] args ) ) new thread (write ).Start ); newthread(write ).Start ); newthread(write ).Start ); }公共静态语音写入() var factory=newconnectionfactory ) { HostName='localhost ',UserName='eric ', passwor IC ' using (var connection=factory.create connection () using ) varchannel=connection.create model ) Chanel for(intI=0; i 8000; I({stringmessage=I.tostring ); var body=encoding.utf8.getbytes (message ); channel.basic publish (exchange : ',routingKey: 'writeLog ',basicProperties: null,body: body ); console.writeline (' program sent {0} ',消息); } }消费者
当队列中有消息时,消费者必须能够随时从队列中检索消息,因此必须继续运行以接收消息。
正如我们需要打篮球传球,提前确定要传递的队友的位置一样,生产者要发送消息,就必须提前知道正确的消费消息程序列是哪一列。 所以,在运行生产者程序之前,必须启动消费者程序。
由此可见,宣布排队应该在消费者计划中完成。
接下来,创建两个客户端控制台程序
类程序{ publicstaticvoidmain () var factory=newconnectionfactory ) ) { HostName='localhost ',UserName='eric
}; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); ExcuateWriteFile(message); Console.WriteLine(" Receiver Received {0}", message); }; channel.BasicConsume(queue: "writeLog", noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } public static void ExcuateWriteFile(string i) { using (FileStream fs = new FileStream(@"d:\test.txt", FileMode.Append)) { using (StreamWriter sw = new StreamWriter(fs, Encoding.Unicode)) { sw.Write(i); } } } }执行程序
先执行 消费者程序,让它一直保持监听。
错误解决
执行时VS报错:
“RabbitMQ.Client.Exceptions.BrokerUnreachableException”类型的未经处理的异常在 RabbitMQ.Client.dll 中发生 其他信息: None of the specified endpoints were reachable。
进入查看详细的内部异常:
innerEception:{“The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=530, text=“NOT_ALLOWED - access to vhost ‘/’ refused for user ‘eric’”, classId=10, methodId=40, cause=”}
此时,我们打开在http://localhost:15672/#/users 可以看到eric 下 的Can access virtual hosts 为 NoAccess
解决方法 cd到rabbitmqsbin目录,控制台输入
再次执行时,可以看到:
再次运行生产端跟消费端