C#中Queue与RabbitMQ的消息队列应用(附源码)

来自:熊泽-学习中的苦与乐

一、定义


MQ是MessageQueue,消息队列的简称(是流行的开源消息队列系统,利用erlang语言开发)。


MQ是一种应用程序对应用程序的通信方法。


应用程序通过读写入队和出队的消息来通信,无需专用连接来链接它们。


消息传递是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,一般应用于远程过程调用的技术。


排队指的是应用程序通过队列来通信。应用队列避免接收和发送数据的同时进行。


二、特点


MQ是消费者-生产者模型的代表。一端往消息队列中写入消息,另一端可以读取或者订阅队列中的消息。


MQ遵循的是AMQP协议(高级消息队列协议:使得遵从该规范的客户端应用和消息中间件服务器的全功能互操作成为可能)的具体实现和产品。


三、应用


在使用MQ时,我们不需要实时的返回信息。获取信息和返回信息进行异步处理。


例如:在项目中,我们需要从汽车系统中利用CAN总线实时的获取汽车的相关信息,但是没有必要给汽车返回信息。


如,获取汽车的轮胎气压,但是我们不需要给汽车一个返回的信息或结果。


C#项目要利用RabbitMQ来获取实时数据的话,需要先安装客户端的库文件:RabbitMQ.Client.dll,下面有提到。


备用下载路径:


链接:https://pan.baidu.com/s/1zcQmPnBF7WcD8sqV4W54pw 

提取码:6962 


写在前面:这个就需要安装RabbitMQ服务、下载Erlang环境并安装、引入RabbitMQ.client.dll动态库。下面有官网可以下载相应的内容。


我这个使用Windows 64位的,我这里整理安装程序在百度网盘,官网打开Erlang很慢的去百度网盘下载


链接:https://pan.baidu.com/s/1zcQmPnBF7WcD8sqV4W54pw 

提取码:6962 


四、安装


需要安装RabbitMQ服务:


官网下载地址:http://www.rabbitmq.com/download.html。




下载完成后一直点击下一步即可。


如果没有Erlang环境会弹出下面的提示:



下载Erlang环境并安装,安装时一直点下一步即可


地址:http://www.erlang.org/downloads


如果打开网页慢或者打不开的,去下载我整理也行,不过我的是windows 64的。


链接:https://pan.baidu.com/s/1zcQmPnBF7WcD8sqV4W54pw 

提取码:6962 


安装完成后我们需要配置环境变量,如下:


点击【计算机】右键,属性,高级系统设置,高级,环境变量,


新建一个系统变量。



输入


变量名:ERLANG_HOME,


变量值:C:\Program Files\erl9.3


变量值是你刚刚安装Erlang的路径



 然后在找到环境变量里面的Path,点击编辑,在变量值的最后面加上  ;%ERLANG_HOME%\bin;,记得有分号(英文分号)


 


安装成功后会在服务中看到该服务。



然后安装RabbitMQ,也是一直点击下一步即可, 


到这里后就准备工作做完了,接下来我们就编写代码。


有的童鞋不知道为什么需要安装RabbitMQ服务和Erlang环境,我这里简单普及一下,详细的请百度一下。


RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。


RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。


所有主要的编程语言均有与代理接口通讯的客户端库。


 五、代码实例


为了讲解效果更佳,我们新建两个控制台应用程序MessageQueueClient(生产者)和MessageQueueServer(消费者),


不要急着建立,看下面的代码依次建立。


生产者 


新建控制台应用程序MessageQueueClient,引用动态文件库RabbitMQ.Client.dll,可以去百度下载一个,上面的网盘路径里面有。


入队代码编写:


using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MessageQueueClient
{
   class Program
   {
       static void Main(string[] args)
       
{
           //生产者
           ConnectionFactory factory = new ConnectionFactory();
           factory.HostName = "127.0.0.1";
           //默认端口
           factory.Port = 5672;
           using (IConnection conn = factory.CreateConnection())
           {
               using (IModel channel = conn.CreateModel())
               {

                   //在MQ上定义一个持久化队列,如果名称相同不会重复创建
                   channel.QueueDeclare("MyRabbitMQ", true, false, false, null);
                   while (true)
                   {
                       string message = string.Format("{0}", Console.ReadLine());  //Console.ReadLine()为控制台输入的内容,我们可以用其他方式获取
                       byte[] buffer = Encoding.UTF8.GetBytes(message);
                       IBasicProperties properties = channel.CreateBasicProperties();
                       properties.DeliveryMode = 2;
                       channel.BasicPublish("", "MyRabbitMQ", properties, buffer);  //入队
                       Console.WriteLine("入队成功:" + message);
                   }
               }
           }
       }
   }
}


控制台入队操作,控制台这一步可以结合实际代码需求进行入队。



这里就入队成功了,接下来我们出队,也就是读取数据,这里和readis有点像,我们之前安装的RabbitMQ服务就是在这里用到了。


生产者


新建控制台应用程序MessageQueueServer,引用动态文件库RabbitMQ.Client.dll,可以去百度下载一个,上面的网盘路径里面有。


出队代码编写:


using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MessageQueueServer
{
   class Program
   {
       static void Main(string[] args)
       
{
           //消费者
           ConnectionFactory factory = new ConnectionFactory();
           factory.HostName = "127.0.0.1";
           //默认端口
           factory.Port = 5672;
           using (IConnection conn = factory.CreateConnection())
           {
               using (IModel channel = conn.CreateModel())
               {
                   //在MQ上定义一个持久化队列,如果名称相同不会重复创建
                   channel.QueueDeclare("MyRabbitMQ", true, false, false, null);

                   //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
                   channel.BasicQos(0, 1, false);

                   Console.WriteLine("Listening...");

                   //在队列上定义一个消费者
                   QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                   //消费队列,并设置应答模式为程序主动应答
                   channel.BasicConsume("MyRabbitMQ", false, consumer);

                   while (true)
                   {
                       //阻塞函数,获取队列中的消息
                       BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                       byte[] bytes = ea.Body;
                       string str = Encoding.UTF8.GetString(bytes);

                       Console.WriteLine("读取队列消息:" + str.ToString());
                       //回复确认
                       channel.BasicAck(ea.DeliveryTag, false);
                   }
               }
           }
       }
   }
}


运行代码,读取队列里面的内容,遵循先入先出原则。



这样队列的数据就读取到了。


六、总结


这是一个简单的消息队列的应用,写的比较粗浅,具体需要结合实际应用项目编写。

推荐↓↓↓
DotNet程序员
上一篇:Vue+axios+WebAPI+NPOI导出Excel文件 下一篇:.NET Core 3 中的性能提升