RabbitMQ.Extensions
一、安装方式
// .net cli
dotnet add package Lycoris.RabbitMQ.Extensions
// package manager
Install-Package Lycoris.RabbitMQ.Extensions
二、注册扩展
2.1 基础连接信息
var mqBuilder = builder.Services.AddRabbitMQExtensions(opt =>
{
// ip地址
opt.Hosts = new string[] { "yout rabbitmq service ip" };
// 端口号 不设置默认:5672
opt.Port = 5672;
// 账号
opt.UserName = "your user name";
// 密码
opt.Password = "your password";
// 虚拟机 不设置默认为:/
opt.VirtualHost = "/";
// 是否持久化 不设置默认:true
opt.Durable = true;
// 是否自动删除 不设置默认:false
opt.AutoDelete = true;
// 禁止消费者自动启动监听
// 禁用后,需要在你需要使用消费者服务前,主动调用 IRabbitConsumerFactory.ManualStartListenAsync() 启用已添加的消费者服务监听
// 详见 四、手动处理消费者服务
opt.DisableRabbitConsumerHostedListen = true;
});
2.2 生产者注册
// 使用生产者工厂模式注册
mqBuilder.AddRabbitProducer("DefaultProducer", opt =>
{
// 保留发布者数 默认:5
opt.InitializeCount = 5;
// 交换机名称
opt.Exchange = "exchange.your.exchangename";
// 交换机类型
opt.Type = RabbitExchangeType.Direct;
// 路由队列
opt.RouteQueues = new RouteQueue[]
{
new RouteQueue()
{
Route = "route.your.routename",
Queue = "queue.your.queuename"
}
};
});
// 使用生产者服务模式注册
// 单实现模式
mqBuilder.AddRabbitProducer<RabbitProducerService>(opt =>
{
// 保留发布者数 默认:5
opt.InitializeCount = 5;
// 交换机名称
opt.Exchange = "exchange.your.exchangename";
// 交换机类型
opt.Type = RabbitExchangeType.Direct;
// 路由队列
opt.RouteQueues = new RouteQueue[]
{
new RouteQueue()
{
Route = "route.your.routename",
Queue = "queue.your.queuename"
},
new RouteQueue()
{
Route = "route.your.routename2",
Queue = "queue.your.queuename2"
}
};
});
// 接口、实现类模式
mqBuilder.AddRabbitProducer<IRabbitProducerService, RabbitProducerService>(opt =>
{
// 保留发布者数 默认:5
opt.InitializeCount = 5;
// 交换机名称
opt.Exchange = "exchange.your.exchangename";
// 交换机类型 延迟队列
opt.Type = RabbitExchangeType.Delayed;
// 延迟秒数
opt.DelayTime = 5;
// 路由队列
opt.RouteQueues = new RouteQueue[]
{
new RouteQueue()
{
Route = "route.your.routename",
Queue = "queue.your.queuename"
},
new RouteQueue()
{
Route = "route.your.routename2",
Queue = "queue.your.queuename2"
}
};
});
使用生产这服务模式注册时,不管是接口实现类还是单实现类均需要继承 BaseRabbitProducerService
基类,并实现构造函数 服务生命周期固定为Scoped
BaseRabbitProducerService
基类包含两个属性:
RabbitProducerFactory
:生产者创建工厂Producer
:当前注册的生产者实例
示例:
public class RabbitProducerService : BaseRabbitProducerService, IRabbitProducerService
{
public RabbitProducerService(IRabbitProducerFactory rabbitProducerFactory) : base(rabbitProducerFactory)
{
}
}
2.3 消费者注册
mqBuilder.AddRabbitConsumer(opt =>
{
// 是否自动提交 默认:false
opt.AutoAck = false;
// 每次发送消息条数 默认:2
opt.FetchCount = 2;
// 交换机类型
opt.Type = RabbitExchangeType.Direct;
// 路由队列
opt.RouteQueues = new RouteQueue[]
{
new RouteQueue()
{
Route = "route.your.routename",
Queue = "queue.your.queuename"
},
new RouteQueue()
{
Route = "route.your.routename2",
Queue = "queue.your.queuename2"
}
};
// 消费者创建详见第二部分 创建消费者
// 添加消费者监听
// 普通模式
opt.AddListener<TestConsumer>("queue.your.queuename");
// 订阅模式、路由模式、Topic模式、延迟队列模式
// exchange.your.exchangename 为对应的生产者交换机
opt.AddListener<TestConsumer1>("exchange.your.exchangename", "queue.your.queuename");
});
三、生产者使用示例
3.1 生产者工厂模式
public class Demo
{
private readonly IRabbitProducerFactory _factory;
public class Demo(IRabbitProducerFactory factory)
{
_factory = factory;
_consumerFactory = consumerFactory;
}
public void PublishTest()
{
// 获取生产者
var producer = factory.Create("DefaultProducer");
// 发送消息
producer.Publish("route.your.routename", "this is push TestConsumer");
}
}
3.2 生产者服务模式
public class RabbitProducerService : BaseRabbitProducerService, IRabbitProducerService
{
public RabbitProducerService(IRabbitProducerFactory rabbitProducerFactory) : base(rabbitProducerFactory)
{
}
/// <summary>
///
/// </summary>
public void Test()
{
// 直接使用生产者进行发送
this.Producer.Publish("route.your.routename", "this is push TestConsumer");
this.Producer.Publish("route.your.routename2", "this is push TestConsumer2");
}
}
四、消费者使用示例
4.1. 扩展类实现
继承扩展封装好的基类:BaseRabbitConsumerListener
,做了基础的异常捕捉
基类包含属性:
Context
:当前消费者接收到的上下文实体Exchange
:当前消费者交换机名称Route
:当前消费者路由ResubmitTimeSpan
:重新发布时间间隔(单位:毫秒,默认1000毫秒)Task<ReceivedHandler> HandleExceptionAsync(Exception exception)
:全局异常拦截,没有重写的情况下,默认扩展返回的是回滚MQ消息
public class TestConsumer : BaseRabbitConsumerListener
{
/// <summary>
///
/// </summary>
/// <param name="body"></param>
/// <returns></returns>
protected override Task<ReceivedHandler> ReceivedAsync(string body)
{
Console.WriteLine($"TestConsumer ==> {body}");
return Task.FromResult(ReceivedHandler.Commit);
}
// 不重写,默认返回的是回滚MQ消息
protected override Task<ReceivedHandler> HandleExceptionAsync(Exception exception)
{
// 处理你未捕获到的异常
}
}
4.2 接口实现
使用 IRabbitConsumerListener
接口实现自己实现
public abstract class TestConsumer : IRabbitConsumerListener
{
/// <summary>
/// 消费消息
/// </summary>
/// <param name="recieveResult"></param>
/// <returns></returns>
public async Task ConsumeAsync(RecieveResult recieveResult)
{
// 提交
recieveResult.Commit();
// 回滚
//recieveResult.RollBack();
// 重新发布
//recieveResult.RollBack(true);
}
}
五、手动处理消费者服务
禁用后,需要在你需要使用消费者服务前,主动调用 IRabbitConsumerFactory.ManualStartListenAsync() 启用已添加的消费者服务监听
public class ApplicationStartService : IHostedService
{
private readonly IRabbitConsumerFactory _consumerFactory;
public ApplicationStartService(IRabbitConsumerFactory consumerFactory)
{
_consumerFactory = consumerFactory;
}
/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task StartAsync(CancellationToken cancellationToken)
{
// do something
// 做一些基础操作,比如数据库迁移、中间件热机、其他前置处理
// 启动消费者监听
await _consumerFactory.ManualStartListenAsync();
}
/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task StopAsync(CancellationToken cancellationToken)
{
// do something
await _consumerFactory.ManualStopListenAsync(cancellationToken);
}
}
六、注意事项
6.1 延迟队列注意点
自建MQ服务的话,延迟队列实现需要MQ安装好对应的插件
- 感谢你赐予我前进的力量
本网站的原创文章部分资源内容可能来源于网络,仅供大家学习与参考,如有侵权,请联系博主邮箱:zzyo.yj@outlook.com 进行删除处理
本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向博主举报
声明:版权所有,违者必究 | 如未注明,均为原创 | 本网站采用CC BY-NC-SA 4.0 协议进行授权
转载:转载请注明原文链接 - Lycoris