Lycoris.RabbitMQ.Extensions
6.0.7-rc-1.0
This is a prerelease version of Lycoris.RabbitMQ.Extensions.
There is a newer version of this package available.
See the version list below for details.
See the version list below for details.
dotnet add package Lycoris.RabbitMQ.Extensions --version 6.0.7-rc-1.0
NuGet\Install-Package Lycoris.RabbitMQ.Extensions -Version 6.0.7-rc-1.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Lycoris.RabbitMQ.Extensions" Version="6.0.7-rc-1.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Lycoris.RabbitMQ.Extensions" Version="6.0.7-rc-1.0" />
<PackageReference Include="Lycoris.RabbitMQ.Extensions" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Lycoris.RabbitMQ.Extensions --version 6.0.7-rc-1.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Lycoris.RabbitMQ.Extensions, 6.0.7-rc-1.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Lycoris.RabbitMQ.Extensions@6.0.7-rc-1.0
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Lycoris.RabbitMQ.Extensions&version=6.0.7-rc-1.0&prerelease
#tool nuget:?package=Lycoris.RabbitMQ.Extensions&version=6.0.7-rc-1.0&prerelease
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
扩展支持延迟队列(rabbitmq_delayed_message_exchange
插件)
插件仓库地址:rabbitmq_delayed_message_exchange
安装方式
// .net cli
dotnet add package Lycoris.RabbitMQ.Extensions
// package manager
Install-Package Lycoris.RabbitMQ.Extensions
一、注册扩展
基础连接信息注册
var mqBuilder = builder.Services.AddRabbitMQExtensions(opt =>
{
// ip地址
opt.Hosts = new string[] { "yout rabbitmq service ip" };
// 端口号 不设置默认:5672
opt.Port = 5672;
// 账号
opt.UserName = "your user name";
// 密码
opt.Password = "your password";
// 虚拟机 不设置默认为:/
opt.VirtualHost = "/";
// 是否持久化 不设置默认:true
opt.Durable = true;
// 是否自动删除 不设置默认:false
opt.AutoDelete = true;
// 禁止消费者自动启动监听
// 禁用后,需要在你需要使用消费者服务前,主动调用 IRabbitConsumerFactory.ManualStartListenAsync() 启用已添加的消费者服务监听
// 详见 四、手动处理消费者服务
opt.DisableRabbitConsumerHostedListen = true;
});
生产者注册
// 使用生产者工厂模式注册
mqBuilder.AddRabbitProducer("DefaultProducer", opt =>
{
// 保留发布者数 默认:5
opt.InitializeCount = 5;
// 交换机名称
opt.Exchange = "exchange.your.exchangename";
// 交换机类型
opt.Type = RabbitExchangeType.Direct;
// 路由队列
opt.RouteQueues = new RouteQueue[]
{
new RouteQueue()
{
Route = "route.your.routename",
Queue = "queue.your.queuename"
}
};
});
// 使用生产者服务模式注册
// 单实现模式
mqBuilder.AddRabbitProducer<RabbitProducerService>(opt =>
{
// 保留发布者数 默认:5
opt.InitializeCount = 5;
// 交换机名称
opt.Exchange = "exchange.your.exchangename";
// 交换机类型
opt.Type = RabbitExchangeType.Direct;
// 路由队列
opt.RouteQueues = new RouteQueue[]
{
new RouteQueue()
{
Route = "route.your.routename",
Queue = "queue.your.queuename"
},
new RouteQueue()
{
Route = "route.your.routename2",
Queue = "queue.your.queuename2"
}
};
});
// 接口、实现类模式
mqBuilder.AddRabbitProducer<IRabbitProducerService, RabbitProducerService>(opt =>
{
// 保留发布者数 默认:5
opt.InitializeCount = 5;
// 交换机名称
opt.Exchange = "exchange.your.exchangename";
// 交换机类型 延迟队列
opt.Type = RabbitExchangeType.Delayed;
// 延迟秒数
opt.DelayTime = 5;
// 路由队列
opt.RouteQueues = new RouteQueue[]
{
new RouteQueue()
{
Route = "route.your.routename",
Queue = "queue.your.queuename"
},
new RouteQueue()
{
Route = "route.your.routename2",
Queue = "queue.your.queuename2"
}
};
});
使用生产这服务模式注册时,不管是接口实现类还是单实现类均需要继承 BaseRabbitProducerService
基类,并实现构造函数
服务生命周期固定为Scoped
BaseRabbitProducerService
基类包含两个属性:
RabbitProducerFactory
:生产者创建工厂Producer
:当前注册的生产者实例
示例
public class RabbitProducerService : BaseRabbitProducerService, IRabbitProducerService
{
public RabbitProducerService(IRabbitProducerFactory rabbitProducerFactory) : base(rabbitProducerFactory)
{
}
}
消费者注册
mqBuilder.AddRabbitConsumer(opt =>
{
// 是否自动提交 默认:false
opt.AutoAck = false;
// 每次发送消息条数 默认:2
opt.FetchCount = 2;
// 交换机类型
opt.Type = RabbitExchangeType.Direct;
// 路由队列
opt.RouteQueues = new RouteQueue[]
{
new RouteQueue()
{
Route = "route.your.routename",
Queue = "queue.your.queuename"
},
new RouteQueue()
{
Route = "route.your.routename2",
Queue = "queue.your.queuename2"
}
};
// 消费者创建详见第二部分 创建消费者
// 添加消费者监听
// 普通模式
opt.AddListener<TestConsumer>("queue.your.queuename");
// 订阅模式、路由模式、Topic模式、延迟队列模式
// exchange.your.exchangename 为对应的生产者交换机
opt.AddListener<TestConsumer1>("exchange.your.exchangename", "queue.your.queuename");
});
二、生产者使用示例
使用生产者工厂模式发送示例
public class Demo
{
private readonly IRabbitProducerFactory _factory;
public class Demo(IRabbitProducerFactory factory)
{
_factory = factory;
_consumerFactory = consumerFactory;
}
public void PublishTest()
{
// 获取生产者
var producer = factory.Create("DefaultProducer");
// 发送消息
producer.Publish("route.your.routename", "this is push TestConsumer");
}
}
使用生产者服务模式发送示例
public class RabbitProducerService : BaseRabbitProducerService, IRabbitProducerService
{
public RabbitProducerService(IRabbitProducerFactory rabbitProducerFactory) : base(rabbitProducerFactory)
{
}
/// <summary>
///
/// </summary>
public void Test()
{
// 直接使用生产者进行发送
this.Producer.Publish("route.your.routename", "this is push TestConsumer");
this.Producer.Publish("route.your.routename2", "this is push TestConsumer2");
}
}
三、消费者使用示例
消费者有两种创建方式
1. 继承扩展封装好的基类:BaseRabbitConsumerListener
,做了基础的异常捕捉。
基类包含属性:
Context
:当前消费者接收到的上下文实体Exchange
:当前消费者交换机名称Route
:当前消费者路由ResubmitTimeSpan
:重新发布时间间隔(单位:毫秒,默认1000毫秒)Task<ReceivedHandler> HandleExceptionAsync(Exception exception)
:全局异常拦截,没有重写的情况下,默认扩展返回的是回滚MQ消息
public class TestConsumer : BaseRabbitConsumerListener
{
/// <summary>
///
/// </summary>
/// <param name="body"></param>
/// <returns></returns>
protected override Task<ReceivedHandler> ReceivedAsync(string body)
{
Console.WriteLine($"TestConsumer ==> {body}");
return Task.FromResult(ReceivedHandler.Commit);
}
// 不重写,默认返回的是回滚MQ消息
protected override Task<ReceivedHandler> HandleExceptionAsync(Exception exception)
{
// 处理你未捕获到的异常
}
}
2. 使用 IRabbitConsumerListener
接口自己实现
public abstract class TestConsumer : IRabbitConsumerListener
{
/// <summary>
/// 消费消息
/// </summary>
/// <param name="recieveResult"></param>
/// <returns></returns>
public async Task ConsumeAsync(RecieveResult recieveResult)
{
// 提交
recieveResult.Commit();
// 回滚
//recieveResult.RollBack();
// 重新发布
//recieveResult.RollBack(true);
}
}
四、手动处理消费者服务
禁用后,需要在你需要使用消费者服务前,主动调用 IRabbitConsumerFactory.ManualStartListenAsync() 启用已添加的消费者服务监听
public class ApplicationStartService : IHostedService
{
private readonly IRabbitConsumerFactory _consumerFactory;
public ApplicationStartService(IRabbitConsumerFactory consumerFactory)
{
_consumerFactory = consumerFactory;
}
/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task StartAsync(CancellationToken cancellationToken)
{
// do something
// 做一些基础操作,比如数据库迁移、中间件热机、其他前置处理
// 启动消费者监听
await _consumerFactory.ManualStartListenAsync();
}
/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task StopAsync(CancellationToken cancellationToken)
{
// do something
await _consumerFactory.ManualStopListenAsync(cancellationToken);
}
}
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
.NETStandard 2.0
- Microsoft.Extensions.Hosting.Abstractions (>= 6.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 6.0.0)
- Newtonsoft.Json (>= 13.0.1)
- RabbitMQ.Client (>= 6.5.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.