在.NET Core中使用RabbitMQ
前言
逛园子的时候看到一篇.NET 学习RabbitMq的文章(视频地址和文章地址放在文章底部了),写的不错,我也来实现一下。
我是把RabbitMQ放在服务器的,然后先说一下如何部署它。
注意:在使用到RabbitMQ的项目中需要安装Nuget包
dotnet add package RabbitMQ.Client
服务器部署
添加management才能有web控制台
ip地址加15672端口访问
拉取镜像:
docker pull rabbitmq:management
运行容器:
#方式一:默认guest 用户,密码也是 guest
docker run -d --hostname test-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
#方式二:设置用户名和密码
docker run -d --hostname test-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
RabbitMq工作队列模式
点对点模式
首先创建2个.NET Core控制台程序
- ZY.RabbitMq.Producer 生产者 用于发送消息
- ZY.RabbitMQ.Consumer01 消费者 用于接收消息
- 点对点模式只会有一个消费者进行消费
然后分别创建HelloProducer类和HelloConsumer类
HelloProduce类
public class HelloProducer
{
public static void HelloWorldShow()
{
var factory = new ConnectionFactory();
factory.HostName = "服务器地址或本机地址";
factory.Port = 5672;//5672是RabbitMQ默认的端口号
factory.UserName = "";
factory.Password = "";
factory.VirtualHost = "/"; //虚拟主机
// 获取TCP 长连接
using var connection = factory.CreateConnection();
// 创建通信“通道”,相当于TCP中的虚拟连接
using var channel = connection.CreateModel();
/*
* 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
* 第一个参数:队列名称ID
* 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
* 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
* 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列
* 其他额外参数为null
*/
channel.QueueDeclare("Hello", true, false, false, null);
Console.ForegroundColor = ConsoleColor.Red;
string message = "hello CodeMan 666";
var body = Encoding.UTF8.GetBytes(message);
/*
* exchange:交换机,暂时用不到,在进行发布订阅时才会用到
* 路由key
* 额外的设置属性
* 最后一个参数是要传递的消息字节数组
*/
channel.BasicPublish("", "Hello", null, body);
Console.WriteLine($"producer消息:{message}已发送");
}
}
HelloConsumer类
public class HelloConsumer
{
public static void HelloWorldShow()
{
var factory = new ConnectionFactory();
factory.HostName = "地址同上";
factory.Port = 5672;//5672是RabbitMQ默认的端口号
factory.UserName = "guest";
factory.Password = "guest";
factory.VirtualHost = "/";
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
/*
* 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
* 第一个参数:队列名称ID
* 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
* 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
* 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列
* 其他额外参数为null
*/
//RabbitConstant.QUEUE_HELLO_WORLD 对应的生产者一样名称 "helloworld.queue"
channel.QueueDeclare("Hello", true, false, false, null);
Console.ForegroundColor = ConsoleColor.Cyan;
EventingBasicConsumer consumers = new EventingBasicConsumer(channel);
// 触发事件
consumers.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// false只是确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"Consumer01接收消息:{message}");
};
/*
* 从MQ服务器中获取数据
* 创建一个消息消费者
* 第一个参数:队列名
* 第二个参数:是否自动确认收到消息,false代表手动确认消息,这是MQ推荐的做法
* 第三个参数:要传入的IBasicConsumer接口
*
*/
//RabbitConstant.QUEUE_HELLO_WORLD == helloworld.queue
channel.BasicConsume("Hello", false, consumers);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
}
}
主程序
然后分别在各自的Main方法中调用函数
HelloConsumer.HelloWorldShow();
HelloProducer.HelloWorldShow();
效果展示
先运行 消费者 然后 运行 生产者
WorkQueues工作队列
简述
在Work Queues模式中,消息被均匀地分发给多个消费者。每个消息只会被一个消费者处理,确保每个任务只被一个消费者执行。这种分发方式称为轮询(Round-robin)分发。
工作队列模式的特点包括:
- 并发处理:多个消费者可以并行地处理消息,提高了系统的处理能力。
- 负载均衡:消息会被均匀地分发给多个消费者,避免某个消费者被过载。
- 消息持久化:消息可以持久化存储在队列中,确保即使在消费者宕机后,消息也不会丢失。
- 确认机制:消费者可以向消息队列发送确认消息,告知消息已被处理完成,确保消息的可靠处理。
工作队列模式适用于任务较重的场景,例如处理大量的计算任务、耗时的IO操作等。它可以提高系统的并发处理能力和可靠性。
总结:Work Queues模式是一种消息队列的模式,它实现了任务的并发处理和负载均衡,适用于处理任务较重的场景。通过使用RabbitMQ的Work Queues模式,你可以提高系统的吞吐量和响应速度。
与点对点模式代码几乎一样,首先,需要将连接RabbitMq的代码封装一下。ZY.RabbitMq.Common类库,然后创建一个RabbitUtils工具类,代码如下:
public class RabbitUtils
{
public static ConnectionFactory GetConnection()
{
var factory = new ConnectionFactory
{
HostName = "ip地址",
Port = 5672, //5672是RabbitMQ默认的端口号
UserName = "",
Password = "",
VirtualHost = "/" //虚拟主机,可以在管理端更改
};
return factory;
}
}
完成工具类后,让Producer 和 Consumer项目引用该类库。
然后创建一个Sms类,代码如下
public class Sms
{
public Sms(string passenger, string phone, string msg)
{
Passenger = passenger;
Phone = phone;
Msg = msg;
}
public string Passenger { get; set; }
public string Phone { get; set; }
public string Msg { get; set; }
}
Producer
在之前的Producer项目中添加SmsSender类,并创建Sender静态方法用于模拟发送消息,代码如下:
public class SmsSender
{
public static void Sender()
{
using var connection = RabbitUtils.GetConnection().CreateConnection();
using var channel =connection.CreateModel();
channel.QueueDeclare("ticket", true, false, false, null);
for (int i = 0; i < 1000; i++)
{
Sms sms = new Sms("乘客" + i, "1390101" + i, "您的车票已预订成功!");
string jsonSms = JsonConvert.SerializeObject(sms);
var body = Encoding.UTF8.GetBytes(jsonSms);
channel.BasicPublish("", "ticket", null, body);
Console.WriteLine($"正在发送内容{jsonSms}");
}
Console.WriteLine("发送数据成功");
}
}
Consumer
在消费者中创建SmsReceive类和Sender静态方法用于接收消息,代码如下:
public class SmsReceive
{
public static void Sender()
{
var connection = RabbitUtils.GetConnection().CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare("ticket", true, false, false, null);
// 如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
// basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.BasicQos(0, 1, false);//处理完一个取一个
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
//Thread.Sleep(30);
Console.WriteLine($"SmsSender-发送短信成功:{message}");
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume("ticket", false, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
}
}
不用与之前的是现在我们需要新增一个消费者用于测试,创建ZY.RabbitMQ.Consumer02 控制台程序,并且和上面一样创建类和方法。代码和Consumer01是一样的,但是延时Thread.Sleep(60)
主程序
依旧在各自的Main方法中调用
//消费者调用
SmsSender.Sender();
//生产者调用
SmsReceive.Sender();
没有延时的效果展示
有延时的演示效果
可以明显的看到因为消费者01的处理效率高,所以他处理的消息比消费者02更多。
让消费能力强的消费更多
- 消费者1比消费者2的效率要快,一次任务的耗时较短,(延时30毫秒)
- 消费者2大量时间处于空闲状态,消费者1一直忙碌,(延时60毫秒)
通过设置channel.BasicAck(ea.DeliveryTag, false)
,来让处理能力强的去消费更多。MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的。
发布订阅模式
简述
Publish/Subscribe Pattern
在订阅模式中,多了一个Exchange角色。
Exchange:交换机,一方面接收生产者发送的消息,另一方面,知道如何处理消息,例如递交给某个特别的队列、递交给所以队列、或是将消息丢弃。如何操作,取决于Exchang的类型。
Exchange常见类型:
- Fanout:广播,将消息给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
下面是发布订阅模式的基本工作流程:
- 创建一个交换机(Exchange)。
- 启动多个消费者,每个消费者都创建一个队列(Queue)并将其绑定到交换机上。
- 启动消息发布者,将消息发送到交换机上。
- 每个消费者从自己的队列中获取消息,并进行处理。
Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
下方代码为Fanout广播类型的交换机
生产者
在Producer中创建WeatherFanout类和Weather静态方法,发送简单的天气信息。
public class WeatherFanout
{
public static void Weather()
{
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
string message = "20度";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER,"",null,body);
Console.WriteLine("天气信息发送成功!");
}
}
}
}
可以看到上述的代码没有声明队列,队列交给消费者去实现,后续需要将队列和交换机进行绑定
消费者01
创建WeatherFanout类和Weather静态方法
public class WeatherFanout
{
public static void Weather()
{
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);
// 声明队列信息
channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
/*
* queueBind 用于将队列与交换机绑定
* 参数1:队列名
* 参数2:交换机名
* 参数3:路由Key(暂时用不到)
*/
channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += ((model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"百度收到的气象信息:{message}");
channel.BasicAck(ea.DeliveryTag, false);
});
channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
Console.WriteLine("Press [Enter] to exit");
Console.Read();
}
}
}
}
消费者02
public class RabbitConstant
{
public const string QUEUE_HELLO_WORLD = "helloworld.queue";
public const string QUEUE_SMS = "sms.queue";
public const string EXCHANGE_WEATHER = "weather.exchange";
public const string **QUEUE_BAIDU** = "baidu.queue";
public const string **QUEUE_SINA** = "sina.queue";
public const string EXCHANGE_WEATHER_ROUTING = "weather.routing,exchange";
public const string EXCHANGE_WEATHER_TOPIC = "weather.topic.exchange";
}
修改队列名为新浪的即可
// 声明队列信息
channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
效果展示
可以看到百度和新浪都收到消息了,原因是因为他们绑定了同一个交换机RabbitConstant.EXCHANGE_WEATHER
Routing路由模式
简述
RabbitMQ的路由模式(Routing)是一种消息传递模式,它允许发送者将消息发送到特定的队列,而不是发送到所有队列。这种模式使用了交换机(Exchange)来实现消息的路由和分发。
在路由模式中,有三个关键组件:
- 生产者(Producer):负责发送消息到交换机,并指定路由键。
- 交换机(Exchange):接收生产者发送的消息,并根据路由键将消息路由到一个或多个与之绑定的队列。
- 队列(Queue):接收交换机发送的消息,并进行消费。
在路由模式中,交换机的类型通常为
direct
,它根据路由键的完全匹配将消息路由到与之绑定的队列。每个队列可以绑定多个路由键。
- 队列与交换机绑定,不能是任意绑定,而是要指定一个RoutingKey
- 消息的发送方向Exchange发送消息时,必须指定消息的RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收消息。
生产者
在Producer项目中新增WeatherDirect类和Weather静态方法,用于发送消息到交换机中
public class WeatherDirect
{
public static void Weather()
{
Dictionary<string, string> area = new Dictionary<string, string>();
area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据");
area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");
area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");
area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据");
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
foreach (var item in area)
{
channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key,
null, Encoding.UTF8.GetBytes(item.Value));
}
Console.WriteLine("气象信息发送成功!");
}
}
}
}
消费者01
消费者接收消息不同于之前的是除了指定队列名和交换机名还需指定路由key,也就是RoutingKey
消费者02
代码同上,修改绑定部分的代码就行了,绑定为RabbitConstant.QUEUE_SINA队列,交换机一样,routingkey换成area中剩余的key就行了
效果展示
可以看到百度和新浪收到的消息是根据routingkey路由去获取的消息
Topics通配符模式
简述
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符。
RoutingKey一般都是由一个或多个单词组成,多个单词之间以“.”分隔,例如:item.insert
通配符规则:
*
:匹配一个单词,可以是任意字符串。#
:匹配零个或多个单词,可以是任意字符串。
生产者
public class WeatherTopic
{
public static void Weather()
{
Dictionary<string, string> area = new Dictionary<string, string>();
area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据");
area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");
area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");
area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据");
using (var connection = RabbitUtils.GetConnection().CreateConnection())
{
using (var channel = connection.CreateModel())
{
foreach (var item in area)
{
channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key,
null, Encoding.UTF8.GetBytes(item.Value));
}
Console.WriteLine("气象信息发送成功!");
}
}
}
}
消费者01
代码与路由模式相差不大,只需修改路由key即可
channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");
匹配路由key为china.开头的所有信息,匹配如下
area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据");
area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");
area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");
消费者02
channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.hubei.*.20210525");
匹配路由key为china.hubei.任意字符串.20210525的信息,匹配如下
area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");
area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");=
效果展示
简单总结
可以看到上面所讲的几种工作模式,很多地方代码重复了,是因为上述代码只是用于学习测试,实际开发中,我们需要将RabiitMq封装起来使用,具体实现看下方 ↓
doNetCore中使用
这里使用的是发布订阅模式,简单的封装了一下连接RabiitMq
为项目安装Nuget包
dotnet add package RabbitMQ.Client
创建Asp.Net Core WebApi项目,在appsettings.json中配置连接信息
"RabbitMQ": {
"Hostname": "localhost",
"Port": "端口",
"Username": "用户名",
"Password": "密码"
}
创建一个连接类
public class RabbitMQSettings
{
public string Hostname { get; set; }
public string Port { get; set; }
public string Username { get; set; }
public string Password { get; set; }
}
然后在program.cs配置文件中配置如下代码
builder.Services.Configure<RabbitMQSettings>(builder.Configuration.GetSection("RabbitMQ"));
然后创建RabbitMQ的封装类,用于处理与RabbitMQ的连接、通道、队列等操作
public class RabbitMQConnectionFactory :IDisposable
{
private readonly RabbitMQSettings _settings;
private IConnection _connection;
public RabbitMQConnectionFactory (IOptions<RabbitMQSettings> settings)
{
_settings = settings.Value;
}
public IModel CreateChannel()
{
if (_connection == null || _connection.IsOpen == false)
{
var factory = new ConnectionFactory()
{
HostName = _settings.Hostname,
UserName = _settings.Username,
Password = _settings.Password
};
_connection = factory.CreateConnection();
}
return _connection.CreateModel();
}
public void Dispose()
{
if (_connection != null)
{
if (_connection.IsOpen)
{
_connection.Close();
}
_connection.Dispose();
}
}
}
创建一个简单的发送消息的服务
public class MessageService
{
private readonly RabbitMQConnectionFactory _connectionFactory;
public MessageService(RabbitMQConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
}
public void SendMessage(string message)
{
using (var channel = _connectionFactory.CreateChannel())
{
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "Test",
routingKey:"",
basicProperties: null,
body: body);
}
}
}
然后添加一个控制器用于测试发送消息
[Route("api/[controller]")]
[ApiController]
public class MessageController : ControllerBase
{
private readonly MessageService _messageService;
public MessageController(MessageService messageService)
{
_messageService = messageService;
}
[HttpPost]
public IActionResult Post([FromBody] string message)
{
_messageService.SendMessage(message);
return Ok();
}
}
在配置文件中注入服务
builder.Services.Configure<RabbitMQSettings>(builder.Configuration.GetSection("RabbitMQ"));
builder.Services.AddSingleton<RabbitMQConnectionFactory >();
builder.Services.AddTransient<MessageService>();
最后创建一个控制台程序用于测试消息接收
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 端口,
UserName = "用户名",
Password = "密码"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("Test", ExchangeType.Fanout);
channel.QueueDeclare(queue: "my_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind("my_queue", "Test", "");
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("接收到消息 {0}", message);
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: "my_queue",
autoAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
效果展示
总结
- 通过设置
channel.BasicAck(ea.DeliveryTag, false)
,来让处理能力强的去消费更多 - channel.BasicQos(0, 1, false); //处理完一个取一个
- channel.BasicAck(ea.DeliveryTag, false); 确认消息的处理结果,并告知RabbitMQ可以从队列中删除该消息
下一篇文章将更新RabbitMQ的延时队列和死信队列
- 延时队列(Delay Queue): 延时队列用于延迟消息的投递,即消息在发送后会在队列中等待一段时间,然后再被消费者接收和处理。延时队列通常用于实现一些定时任务、延迟任务或者消息重试机制。
- 死信队列(Dead Letter Queue): 死信队列用于处理无法被正常消费的消息,即那些无法被消费者成功处理的消息。当消息满足一定的条件时,例如消息过期、被拒绝、队列长度超过限制等,这些消息会被投递到死信队列中,以便进一步处理或分析。
参考链接
- centos-docker安装rabbitmq https://blog.csdn.net/qq_40408317/article/details/105638053
- .NET6使用RabbitMQ学习 https://www.cnblogs.com/fantasy-ke/p/17555153.html
- RabbitMQ从零到高可用集群 https://www.bilibili.com/video/BV1GU4y1w7Yq/?share_source=copy_web&vd_source=fce337a51d11a67781404c67ec0b5084