FluentWorkflow.RabbitMQ
2.1.1
dotnet add package FluentWorkflow.RabbitMQ --version 2.1.1
NuGet\Install-Package FluentWorkflow.RabbitMQ -Version 2.1.1
<PackageReference Include="FluentWorkflow.RabbitMQ" Version="2.1.1" />
<PackageVersion Include="FluentWorkflow.RabbitMQ" Version="2.1.1" />
<PackageReference Include="FluentWorkflow.RabbitMQ" />
paket add FluentWorkflow.RabbitMQ --version 2.1.1
#r "nuget: FluentWorkflow.RabbitMQ, 2.1.1"
#:package FluentWorkflow.RabbitMQ@2.1.1
#addin nuget:?package=FluentWorkflow.RabbitMQ&version=2.1.1
#tool nuget:?package=FluentWorkflow.RabbitMQ&version=2.1.1
FluentWorkflow
A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。
1. Intro
基于消息驱动的分布式异步工作流程处理框架,使用 SourceGenerator 简化开发中的重复工作。
使用场景
- 典型的消息驱动处理流程
在典型的消息驱动处理流程中,阶段的开始消息与结束消息、各个消息的触发都需要手动定义,这些多数属于重复工作,FluentWorkflow是为了减少这些重复劳动而诞生的
BreakChanges
2.0完全不兼容之前的代码消息、处理器、相关命名空间等,都需要按新的命名规则调整
- 已在包
FluentWorkflow.RabbitMQ启用RabbitMQ.Client的7.*版本支持,6.*版本支持使用包FluentWorkflow.RabbitMQ.Legacy
2. Features
- 基础代码自动生成,开发时只需要关注业务;
- 跨实例、跨服务工作流程驱动;
- 灵活的子工作流程等待/工作流程嵌套;
- 灵活的拓展性(
partial/继承); Diagnostic支持;- 目标框架
net8.0+; - *针对多个消息类型的Qos;
NOTE:
- 更新包时应当
尽可能的全链路更新,避免导致的未知问题; WorkflowContext核心为字符串字典其属性在赋值时进行序列化存放,对象后续的修改不会反应到上下文中;2.0已调整相关逻辑,引用类型对象的修改将会正常反馈到后续流程中
Workflow中重写各个阶段的触发事件方法时,方法内不能往外抛出异常,会导致该阶段消息重新进入队列,再次执行;- 默认分发器
FluentWorkflow.RabbitMQ依赖交换机和队列进行消息分发,当存在多套环境需要隔离时,确保交换机和队列都不相同,否则将会出现消息重复消费; - 默认分发器
FluentWorkflow.RabbitMQ在绑定信息(交换机、队列)变更时不能完全自动调整,需要人工修正,如手动移除队列错误的交换机绑定和RoutingKey绑定,否则将会出现消息重复消费; - 框架暂时没有保证消息可靠性,即在
消息队列中间件异常的情况下可能会出现流程中断、重复消费等情况;
3. 开始使用
3.1 引用 FluentWorkflow.Core 包
<ItemGroup>
<PackageReference Include="FluentWorkflow.Core" Version="2.1.1" />
</ItemGroup>
3.2 定义工作流程
3.2.1 定义一个 工作流程声明
public partial class SampleWorkflowDeclaration : IWorkflowDeclaration
{
}
- 声明类型为
partial; - 继承接口
IWorkflowDeclaration;
此时代码生成器会自动为其继承基类,手动实现基类并定义工作流程
public partial class SampleWorkflowDeclaration : IWorkflowDeclaration
{
internal override void DeclareContext(IWorkflowContextDeclarator declarator)
{
declarator.Property<int>("Count");
}
internal override void DeclareWorkflow(IWorkflowDeclarator declarator)
{
declarator.Name("Sample") //声明工作流程名称
.Begin() //开始定义流程
.Then("SampleStage1") //声明阶段 SampleStage1
.Then("SampleStage2") //声明阶段 SampleStage2
.Then("SampleStage3") //声明阶段 SampleStage3
.Completion(); //完成声明
}
}
到此一个 工作流程声明 就定义完成了,该工作流程名为Sample,包含三个阶段 SampleStage1 → SampleStage2 → SampleStage3,上下文中包含一个名为Count的int类型属性
工作流程声明在DeclareWorkflow方法中使用参数declarator定义,必须链式调用:- 使用
Name("{WorkflowName}")定义名称 - 使用
Begin()开始定义流程 - 使用
Then("{StageName}")声明每个阶段,声明顺序即为阶段顺序,阶段名称必须满足C#标识符命名规则和约定 - 使用
Completion()结束定义
- 使用
工作流程上下文声明在DeclareContext方法中使用参数declarator定义,必须链式调用:- 使用
Property<T>("{PropertyName}", "{Comment}")定义上下文的属性及其类型与备注 - 其本质与生成工作代码时手动声明
partial类进行属性添加相同,此逻辑更方便分发
- 使用
3.2.2 生成 工作流程 的工作代码
- 使用
GenerateWorkflowCodesAttribute声明要生成的工作流程
[assembly: GenerateWorkflowCodes<SampleWorkflowDeclaration>]
- 在目标
工作流程声明的相同命名空间下声明{WorkflowName}Workflow的partial类
[assembly: GenerateWorkflowCodes<SampleWorkflowDeclaration>]
namespace SampleWorkflowNamespace; //应当与目标声明相同,即与 SampleWorkflowDeclaration 的命名空间相同
public partial class SampleWorkflow
{
public SampleWorkflow(SampleWorkflowContext context, IServiceProvider serviceProvider) : base(context, serviceProvider)
{
}
}
- 代码生成器已为工作流程生成了必要的工作代码:
- 工作流程上下文
SampleWorkflowContext(模板:{WorkflowName}Context) - 工作流程消息
- 每个阶段的开始消息 (模板:
Stage{StageName}Message) - 每个阶段的完成消息 (模板:
Stage{StageName}CompletedMessage) - 流程所需的其它消息
- 每个阶段的开始消息 (模板:
- 阶段处理器基类
StageSampleStage1HandlerBase、StageSampleStage2HandlerBase、StageSampleStage3HandlerBase(模板:Stage{StageName}HandlerBase) - *其它相关支撑类型
- 所有代码生成在
SampleWorkflowDeclaration的命名空间及其子命名空间下
- 工作流程上下文
3.3 实现阶段处理器
继承对应的阶段处理器基类,并实现各个阶段处理逻辑
// SampleStage2 与 SampleStage3 同理
public class StageSampleStage1Handler : StageSampleStage1HandlerBase
{
public StageSampleStage1Handler(IServiceProvider serviceProvider) : base(serviceProvider)
{
}
protected override Task ProcessAsync(ProcessContext processContext, StageSampleStage1Message stageMessage, CancellationToken cancellationToken)
{
//TODO 阶段业务逻辑
return Task.CompletedTask;
}
}
3.4 配置服务
配置控制服务
services.AddFluentWorkflow()
.AddSampleWorkflow(configuration => //添加工作流程
{
configuration.AddScheduler() //添加工作流程调度器
.AddResultObserver(); //添加结果观察器
})
.UseInMemoryWorkflowMessageDispatcher(); //配置使用的消息分发器,这里使用基于内存的分发器来示范
配置阶段处理服务
services.AddFluentWorkflow()
.AddSampleWorkflow(configuration => //添加工作流程
{
configuration.StageSampleStage1Handler<StageSampleStage1Handler>(); //添加对应阶段的处理器, SampleStage2 与 SampleStage3 同理
})
.<>()
.UseInMemoryWorkflowMessageDispatcher(); //配置使用的消息分发器,这里使用基于内存的分发器来示范
FluentWorkflow正常工作的必要条件:
- 流程中的所有
服务使用同一套消息分发器; - 有且仅配置了一个(单个服务,可多实例)工作流程调度器 -
WorkflowScheduler; - 所有阶段的阶段处理器 -
StageHandler,各个阶段的阶段处理器有且仅有一个(单个服务,可多实例); - *需要等待
子工作流程时必须配置子工作流程结果观察器 -ResultObserver; - *需要单次等待多个
子工作流程时,必须使用支持等待多个子工作流程的IWorkflowAwaitProcessor; (默认实现了基于redis的多流程等待处理器,配置时使用UseRedisWorkflowAwaitProcessor方法以启用)
3.5 启动工作流程
//从DI容器中获取工作流程构建器
var workflowBuilder = ServiceProvider.GetRequiredService<IWorkflowBuilder<SampleWorkflow>>();
//创建工作流程上下文
var context = new SampleWorkflowContext();
//构建工作流程
var workflow = workflowBuilder.Build(context);
//启动工作流程,框架会自动触发各个阶段处理器后完成
await workflow.StartAsync(default);
4. 注意事项
- 启动工作流程的服务可以不是配置工作流程调度器 -
WorkflowScheduler的服务,但需要接入消息分发器并在配置时使用Add****Workflow()添加对应的工作流程构造器; - 源代码生成器生成的绝大部分类型都是
partial的,可以声明partial类进行拓展,不可使用partial类拓展的功能基本上都可以继承后重写,在配置服务时替换默认实现即可; - 定义的
Workflow类会添加生命周期各个阶段的触发事件方法,可以继承后重写其逻辑以在各个阶段执行相关的逻辑(注意每次触发可能不在同一个服务实例中。重写后应当捕获并处理所有异常,不要抛出); WorkflowContext核心为字符串/对象混合字典,对其修改理论上只对后续可见并在整个执行周期可用,可以将执行参数、结果、中间值等存放其中;- 消息的分发、重试等逻辑由具体使用的消息分发器
IWorkflowMessageDispatcher控制(默认提供了基于CAP、Abp以及基础的FluentWorkflow.RabbitMQ可选); - 默认情况下
StageHandler出现异常则认为工作流程失败,不会将异常抛给上层IWorkflowMessageDispatcher(消息分发的重试不会触发),可以重写StageHandler的OnException方法来将异常向上抛出; - 更改既有工作流程时,如果
修改/删除了既有的阶段定义,会导致还在处理过程中工作流程无法正常运行(但添加不会影响);
5. 其它
5.1 生成拓展功能代码
部分功能为源码接入的方式,默认不生成,在项目中指定需要的功能后自动生成
<PropertyGroup>
<FluentWorkflowGeneratorAdditional>AbpFoundation,CAPFoundation,AbpMessageDispatcher,CAPMessageDispatcher,RedisAwaitProcessor</FluentWorkflowGeneratorAdditional>
</PropertyGroup>
| 名称 | 功能 |
|---|---|
| AbpFoundation | Abp的基础功能支持 |
| CAPFoundation | CAP的基础功能支持 |
| AbpMessageDispatcher | Abp的消息分发器 |
| CAPMessageDispatcher | CAP的消息分发器 |
| RedisAwaitProcessor | 基于StackExchange.Redis的子流程等待处理器 |
- 生成的可能冲突的类型会放到命名空间
FluentWorkflow.GenericExtension.{工作流程命名空间}下,如配置拓展方法等;
5.2 使用默认分发器 FluentWorkflow.RabbitMQ
引用 FluentWorkflow.RabbitMQ 包
<ItemGroup>
<PackageReference Include="FluentWorkflow.RabbitMQ" Version="2.1.1" />
</ItemGroup>
配置
services.AddFluentWorkflow()
.UseRabbitMQMessageDispatcher(options =>
{
//配置RabbitMQ
});
*控制指定消息的消费速率
配置单个消息的消费速率,其它消息不受限
services.Configure<RabbitMQOptions>(options =>
{
//配置阶段Stage1的消息 - StageSampleStage1Message 的消费速率,即当前服务实例同时只会有一个阶段Stage1在处理
options.MessageGroup("Group1", builder =>
{
builder.Add<StageSampleStage1Message>()
.WithQosChannelInitialization(1);
});
});
*消息确认超时
RabbitMQ消息的消费ack超时时间默认为30分钟,进行长时间处理时可能会出现意外情况,可参照 acknowledgement-timeout 进行调整
- 框架已默认尝试设置队列参数
x-consumer-timeout为 1 小时(如果RabbitMQ版本支持的话); - 可使用
RabbitMQOptions.QueueArgumentsSetup对队列的x-consumer-timeout参数进行调整;
*阶段消息重试
默认情况下,当抛出的异常继承接口 IBusyConsumer 时,流程不会立即失败,而是将消息重新返回消息队列
5.3 子工作流程等待
在阶段处理器中实现子工作流程等待逻辑
internal class StageSampleStage1Handler : StageSampleStage1HandlerBase
{
public StageSampleStage1Handler(IServiceProvider serviceProvider) : base(serviceProvider)
{
}
protected async override Task ProcessAsync(ProcessContext processContext, StageSampleStage1Message stageMessage, CancellationToken cancellationToken)
{
//构建子工作流程
var workflowBuilder = ServiceProvider.GetRequiredService<IWorkflowBuilder<SampleWorkflow>>();
var workflow = workflowBuilder.Build(new SampleWorkflowContext());
//Other workflow setting
//将未启动的子工作流程传递给当前阶段处理上下文,并命名为 - 'taskName'
processContext.AwaitChildWorkflow("taskName", workflow);
// Other logic
//当前阶段将等待子工作流程处理完成后再完成
}
protected override async Task OnAwaitFinishedAsync(SampleWorkflowContext context, IReadOnlyDictionary<string, IWorkflowContext?> childWorkflowContexts, CancellationToken cancellationToken)
{
//从等待的子工作流程上下文字典中取出 - 'taskName'
var workflowContext = (SampleWorkflowContext)childWorkflowContexts["taskName"];
//处理子工作流程结果,如将 workflowContext 内的结果赋值给 context,以便在当前工作流程的后续阶段中使用等
await base.OnAwaitFinishedAsync(context, childWorkflowContexts, cancellationToken);
//当前阶段将完成
}
}
5.4 启用Diagnostic支持
services.AddFluentWorkFlow().EnableDiagnostic();
5.5 使用IWorkflowDebugRunner进行调试运行
- 在DI中添加支持
services.AddFluentWorkflow().AddDebugRunner();
- 使用
IWorkflowDebugRunner进行消息的执行
var transmissionModelRawData = """
//MessageJson
"""u8.ToArray();
var debugRunner = ServiceProvider.GetRequiredService<IWorkflowDebugRunner>();
debugRunner.RunAsync(transmissionModelRawData);
6 流程的中止、挂起与恢复
6.1 中止流程
在 WorkFlow 的 On{StageName}Async 和 On{StageName}CompletedAsync 中不执行参数委托 fireMessage,则后续流程不再执行
6.2 流程挂起
在 WorkFlow 的 On{StageName}Async 和 On{StageName}CompletedAsync 中不执行参数委托 fireMessage,中止流程,在此基础上调用 SerializeContext 方法将上下文序列化后存放
// 存放 contextData 以用于流程恢复
var contextData = SerializeContext(message.Context);
6.3 流程恢复
调用具体 WorkFlow 的静态方法 ResumeAsync 使用挂起的流程数据进行恢复执行
// contextData 为序列化的上下文数据
await XXXXWorkflow.ResumeAsync(contextData, serviceProvider, cancellationToken)
注意:
恢复流程将会再次调用序列化上下文时的方法,需要注意,小心再次被挂起
更多信息参见源码内的测试代码
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- FluentWorkflow.Core (>= 2.1.1)
- Microsoft.Extensions.Hosting.Abstractions (>= 8.0.0)
- RabbitMQ.Client (>= 7.1.2)
-
net9.0
- FluentWorkflow.Core (>= 2.1.1)
- Microsoft.Extensions.Hosting.Abstractions (>= 8.0.0)
- RabbitMQ.Client (>= 7.1.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 2.1.1 | 15,930 | 9/10/2025 |
| 2.1.0 | 15,526 | 6/21/2025 |
| 2.1.0-pre-002 | 1,508 | 5/12/2025 |
| 2.1.0-pre-001 | 108 | 5/10/2025 |
| 2.0.2 | 390 | 4/22/2025 |
| 2.0.1 | 258 | 4/3/2025 |
| 2.0.0 | 340 | 3/4/2025 |
| 2.0.0-pre-013 | 208 | 2/7/2025 |
| 2.0.0-pre-012 | 130 | 2/7/2025 |
| 2.0.0-pre-011 | 166 | 2/4/2025 |
| 2.0.0-pre-010 | 138 | 2/4/2025 |
| 2.0.0-pre-008 | 121 | 2/4/2025 |
| 2.0.0-pre-007 | 149 | 2/4/2025 |
| 2.0.0-pre-006 | 145 | 2/4/2025 |
| 2.0.0-pre-005 | 125 | 2/3/2025 |
| 2.0.0-pre-004 | 124 | 2/3/2025 |
| 2.0.0-pre-003 | 126 | 2/3/2025 |
| 2.0.0-pre-002 | 134 | 2/3/2025 |
| 1.4.3 | 500 | 12/17/2024 |
| 1.4.2 | 147 | 12/16/2024 |
| 1.4.1 | 369 | 12/5/2024 |
| 1.4.0 | 163 | 12/4/2024 |
| 1.3.5 | 159 | 12/2/2024 |
| 1.3.4 | 304 | 11/23/2024 |
| 1.3.3 | 147 | 11/22/2024 |
| 1.3.2 | 138 | 11/17/2024 |
| 1.3.1 | 130 | 11/16/2024 |
| 1.3.0 | 146 | 11/12/2024 |
| 1.2.0 | 201 | 7/18/2024 |
| 1.1.6 | 276 | 7/1/2024 |
| 1.1.5 | 177 | 6/16/2024 |
| 1.1.4 | 159 | 6/13/2024 |
| 1.1.3 | 171 | 6/2/2024 |
| 1.1.2 | 736 | 5/24/2024 |
| 1.1.1 | 141 | 5/24/2024 |
| 1.1.0 | 161 | 5/19/2024 |
| 1.0.0 | 169 | 3/22/2024 |
| 1.0.0-preview-007 | 128 | 2/26/2024 |
| 1.0.0-preview-006 | 266 | 11/15/2023 |
| 1.0.0-preview-005 | 172 | 10/23/2023 |
| 1.0.0-preview-004 | 366 | 10/17/2023 |
| 1.0.0-preview-003 | 306 | 7/2/2023 |
| 1.0.0-preview-002 | 221 | 6/13/2023 |