SHAKA.DWE.RabbitMQEF
1.1.0
dotnet add package SHAKA.DWE.RabbitMQEF --version 1.1.0
NuGet\Install-Package SHAKA.DWE.RabbitMQEF -Version 1.1.0
<PackageReference Include="SHAKA.DWE.RabbitMQEF" Version="1.1.0" />
<PackageVersion Include="SHAKA.DWE.RabbitMQEF" Version="1.1.0" />
<PackageReference Include="SHAKA.DWE.RabbitMQEF" />
paket add SHAKA.DWE.RabbitMQEF --version 1.1.0
#r "nuget: SHAKA.DWE.RabbitMQEF, 1.1.0"
#:package SHAKA.DWE.RabbitMQEF@1.1.0
#addin nuget:?package=SHAKA.DWE.RabbitMQEF&version=1.1.0
#tool nuget:?package=SHAKA.DWE.RabbitMQEF&version=1.1.0
SHAKA.DWE.RabbitMQEF Implementation
This library provides concrete implementations of the SHAKA.DWE infrastructure abstractions using SHAKA.MessageBus.RabbitMQ and SHAKA.MessageBus.IntegrationEventLog.EF.
Overview
SHAKA.DWE.RabbitMQEF is a concrete implementation of the SHAKA.DWE Distributed Workflow Engine that uses:
- RabbitMQ for messaging (via SHAKA.MessageBus.RabbitMQ)
- Entity Framework Core for persistence( via SHAKA.MessageBus.IntegrationEventLog.EF)
Messagiging
Here we try to implement messaging abstractions provided by SHAKA.DWE.Abstractions using SHAKA.MessageBus.IntegrationEventLog.EF and SHAKA.MessageBus.RabbitMQ.
- RabbitMQWorkflowEventBus: is the implementation of the IWorkflowEventBus which defines a Publish method, here we use SHAKA.MessageBus.IntegrationEventLog.EF libraries component IIntegrationEventLogService to write the given IWorkflowEvent into the Outbox table.
- WorkflowEvent: is IWorkflowEvent which allows us to use it for the DWE operations and it is also IntegrationEvent Which allows us to use it with SHAKA.MessageBus.IntegrationEventLog.EF library.
- WorkflowEventHandlerAdapter: is an adapter for IWorkflowEventHandler in order to make it work with IIntegrationEventHandler, this class allows us to to execute WorkflowEventHandler.Handle method when there will be a requst to execute IIntegrationEventHandler Handle method, this is how we register them in DI:
public static IEventBusBuilder AddWorkflowEventHandler<TEvent, THandler>(this IEventBusBuilder eventBusBuilder)
where TEvent : IntegrationEvent, IWorkflowEvent
where THandler : class, IWorkflowEventHandler<TEvent>
{
eventBusBuilder.Services.AddScoped<IWorkflowEventHandler<TEvent>, THandler>();
eventBusBuilder.AddSubscription<TEvent, WorkflowEventHandlerAdapter<TEvent>>();
return eventBusBuilder;
}
As you can see in DI we essentially say that whenever there will be a request for IIntegrationEventHander provide WorkflowEventHandlerAdapter which internally uses IWorkflowEventHandler to handle the event, that's why we also register eventBusBuilder.Services.AddScoped<IWorkflowEventHandler<TEvent>, THandler>() to be able to construct WorkflowEventHandlerAdapter. Main idea is that because we use SHAKA.MessageBus.IntegrationEventLog.EF we need to make sure that IIntegrationEventHaqndler will be provided for that library propely which'll still allow us to use IWorkflowEventHandler.
- DynamicWorkflowEventHandler: is an event handler that is used to create a dynamic handlers for the ResponseEvents, since the response events only need to receive the event and continue the workflow either with failure or sucess scenario we can avoid asking the user to implement this handler and just use this generic handler for all the response events. This is how we configure it in DI:
public WorkflowStepConfiguration WithPossibleResponse<TEvent>(ResponseEventType responseEventType)
where TEvent : WorkflowEvent
{
_responseEventRegistry.Register(typeof(TEvent).Name, responseEventType);
_serviceCollection.AddScoped<IWorkflowEventHandler<TEvent>, DynamicWorkflowEventHandler<TEvent>>();
_serviceCollection.AddKeyedTransient<IIntegrationEventHandler, WorkflowEventHandlerAdapter<TEvent>>(typeof(TEvent));
_serviceCollection.Configure(delegate (EventBusSubscriptionInfo o)
{
o.EventTypes[typeof(TEvent).FullName!] = typeof(TEvent);
});
return this;
}
- WorkflowStepConfiguration: This class defines some methods methods that allow us to configure the workflow step with RetryPolicy, CompensationPolicy and PossibleResponses.
- WorkflowBuilder: Is a builder class for the WorkflowDefinition, allowing us to add the steps and initiator compensations in the workflow.
Persistence
For the persistence we use the EF core so we define the entity classes for some of the SHAKA.DWE models: WorkflowDataEntity, WorkflowInstanceEntity, WorkflowStepHistoryEntity. This methods define the mapping methods for these entities.
- EFWorkflowRepository: Implements IWorkflowRepository using EF Core and it uses the entity classes to workf with db but it returns SHAKA.DWE models as a response.
ServiceCollectionExtensions_ defines an extensions method to configure previously mentioned services in DI.
- AddRabbitMQEFWorkflowEventBus: Registers the event bus, publisher, workflow engine.
- AddWorkflowEventHandler: Registers the workflow event handler adapter and the actual workflow event handler.
- AddEFWorkflowPersistence: Registers the EF workflow repository.
- ConfigureDistributedWorkflowEntities: this method must be called in the client libraries DbContexts OnModelCreating method in order to configure the entities related to Distributed Workflow Engine.
Problems That Arise During Horizontal Scaling of the Orchestrator
1. In‑Memory Circuit Breaker State Divergence
- Risk: Each node keeps its own breaker state; resumes on different nodes may bypass open circuits.
- Fix:
- Persist breaker state in a shared store (DB/Redis) keyed by dependency/step.
- Expose
ICircuitStateStore(Get/Set/CompareExchange,OpenUntil, success/failure counters). - Base "open until" on server/DB time; add TTL and fencing token.
2. ResumeWorkflow Races and Duplicate Resumes
- Risk: Redeliveries or handler retries call
ResumeWorkflowconcurrently on different nodes. - Fix:
- Add optimistic concurrency (rowversion/timestamp) to
WorkflowInstanceEntity. - Predicate updates:
WHERE Id = @id AND State = 'Suspended' AND CurrentStepIndex = @idx. - Validate against
ExpectedEventTypes; ignore unexpected events. - Ensure resume is idempotent: if not suspended or step advanced, no-op.
- Add optimistic concurrency (rowversion/timestamp) to
3. Duplicate Compensations
- Risk: Concurrent failure paths lead to compensation twice.
- Fix:
- Execute compensation only after an atomic state transition to
Compensatingguarded by concurrency token. - Make compensations idempotent (idempotency keys per step + instance).
- Record
Compensation_{Step}_Doneflag and check before executing.
- Execute compensation only after an atomic state transition to
4. Outbox Publisher Contention
- Risk: Multiple publishers read the same rows.
- Fix:
- Guarded claim:
UPDATE ... SET State = InProgress WHERE Id=@id AND State IN (NotPublished, Failed)and check rows affected. - Prefer atomic "claim N" pattern (
UPDATE TOP(@n)... OUTPUT inserted.*) to avoid two‑phase read/claim. - Keep idempotent publish on the consumer side using
EventId.
- Guarded claim:
5. Auto Response Handlers on Every Node
- Risk: Misconfigured topology (multiple queues) causes duplicate deliveries.
- Fix:
- Ensure one shared queue per response event type (competing consumers).
- Keep manual handlers only for non‑workflow domain events; response handlers are auto‑registered once via a singleton registry.
6. Retry/Backoff Timing Skew
- Risk: Different clocks produce different retry due times.
- Fix:
- Compute backoff due times with DB/server time (e.g.,
GETUTCDATE()). - Persist
NextRetryUtc; only resume whennow >= NextRetryUtcwith predicate updates.
- Compute backoff due times with DB/server time (e.g.,
7. Step Execution Duplication
- Risk: Two nodes execute the same step due to races.
- Fix:
- Transition instance to
Runningwith a predicate (state/index) before action. - Record
StepHistorywith a unique constraint (InstanceId,StepIndex) to reject duplicates. - Make step side‑effects idempotent where possible.
- Transition instance to
8. Event Handler Idempotency
- Risk: Broker redeliveries reprocess response events.
- Fix:
- Store processed
EventIds(per handler) with TTL; ignore duplicates. - Alternatively, ensure
ResumeWorkflowis idempotent (see #2).
- Store processed
9. Leader‑Only Orchestration Work (Future Timers/Scans)
- Risk: If you add due‑step scans, cleanups, or reminder jobs, each node will run them.
- Fix:
- Add leader election (DB lease, Redis lock, or K8s Lease):
- Lease row:
OwnerId,LeaseId,ExpiresAt,Version; renew heartbeats; use fencing token on writes. - Only leader runs scheduled scans/cleanup; followers handle events.
- Lease row:
- Add leader election (DB lease, Redis lock, or K8s Lease):
10. Circuit Breaker Backoff Across Nodes
- Risk: A breaker opens on one node but another resumes too early.
- Fix:
- Store
OpenedAt/OpenUntilin shared store; checkIsExecutionAllowedagainst shared state. - Record success/failure to the same store; use monotonic version to serialize transitions.
- Store
11. Clock Skew and TTLs
- Risk: Expiry calculations differ per node.
- Fix:
- Always use DB time for lease/expiry and retry computations.
- Avoid relying on local system time for orchestration decisions.
12. Observability Gaps
- Risk: Hard to diagnose split‑brain/duplication.
- Fix:
- Emit metrics: resumes attempted/succeeded, rejected resumes, duplicate compensations blocked, breaker opens, outbox claims/publishes.
- Structured logs with
InstanceId,StepIndex,FencingToken/Version.
13. Queue Topology Drift
- Risk: Switching from direct single queue to per‑instance queues introduces duplication.
- Fix:
- Document and enforce exchange/queue bindings in code/config.
- Add a health check to verify expected bindings at startup.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- SHAKA.DWE (>= 1.1.0)
- SHAKA.MessageBus.IntegrationEventLog.EF (>= 1.0.3)
- SHAKA.MessageBus.RabbitMQ (>= 1.0.8)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.