SHAKA.DWE.RabbitMQEF 1.1.0

dotnet add package SHAKA.DWE.RabbitMQEF --version 1.1.0
                    
NuGet\Install-Package SHAKA.DWE.RabbitMQEF -Version 1.1.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SHAKA.DWE.RabbitMQEF" Version="1.1.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="SHAKA.DWE.RabbitMQEF" Version="1.1.0" />
                    
Directory.Packages.props
<PackageReference Include="SHAKA.DWE.RabbitMQEF" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add SHAKA.DWE.RabbitMQEF --version 1.1.0
                    
#r "nuget: SHAKA.DWE.RabbitMQEF, 1.1.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package SHAKA.DWE.RabbitMQEF@1.1.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=SHAKA.DWE.RabbitMQEF&version=1.1.0
                    
Install as a Cake Addin
#tool nuget:?package=SHAKA.DWE.RabbitMQEF&version=1.1.0
                    
Install as a Cake Tool

SHAKA.DWE.RabbitMQEF Implementation

This library provides concrete implementations of the SHAKA.DWE infrastructure abstractions using SHAKA.MessageBus.RabbitMQ and SHAKA.MessageBus.IntegrationEventLog.EF.

Overview

SHAKA.DWE.RabbitMQEF is a concrete implementation of the SHAKA.DWE Distributed Workflow Engine that uses:

  • RabbitMQ for messaging (via SHAKA.MessageBus.RabbitMQ)
  • Entity Framework Core for persistence( via SHAKA.MessageBus.IntegrationEventLog.EF)

Messagiging

Here we try to implement messaging abstractions provided by SHAKA.DWE.Abstractions using SHAKA.MessageBus.IntegrationEventLog.EF and SHAKA.MessageBus.RabbitMQ.

  • RabbitMQWorkflowEventBus: is the implementation of the IWorkflowEventBus which defines a Publish method, here we use SHAKA.MessageBus.IntegrationEventLog.EF libraries component IIntegrationEventLogService to write the given IWorkflowEvent into the Outbox table.
  • WorkflowEvent: is IWorkflowEvent which allows us to use it for the DWE operations and it is also IntegrationEvent Which allows us to use it with SHAKA.MessageBus.IntegrationEventLog.EF library.
  • WorkflowEventHandlerAdapter: is an adapter for IWorkflowEventHandler in order to make it work with IIntegrationEventHandler, this class allows us to to execute WorkflowEventHandler.Handle method when there will be a requst to execute IIntegrationEventHandler Handle method, this is how we register them in DI:
public static IEventBusBuilder AddWorkflowEventHandler<TEvent, THandler>(this IEventBusBuilder eventBusBuilder)
        where TEvent : IntegrationEvent, IWorkflowEvent
        where THandler : class, IWorkflowEventHandler<TEvent>
{
    eventBusBuilder.Services.AddScoped<IWorkflowEventHandler<TEvent>, THandler>();
    eventBusBuilder.AddSubscription<TEvent, WorkflowEventHandlerAdapter<TEvent>>();

    return eventBusBuilder;
}

As you can see in DI we essentially say that whenever there will be a request for IIntegrationEventHander provide WorkflowEventHandlerAdapter which internally uses IWorkflowEventHandler to handle the event, that's why we also register eventBusBuilder.Services.AddScoped<IWorkflowEventHandler<TEvent>, THandler>() to be able to construct WorkflowEventHandlerAdapter. Main idea is that because we use SHAKA.MessageBus.IntegrationEventLog.EF we need to make sure that IIntegrationEventHaqndler will be provided for that library propely which'll still allow us to use IWorkflowEventHandler.

  • DynamicWorkflowEventHandler: is an event handler that is used to create a dynamic handlers for the ResponseEvents, since the response events only need to receive the event and continue the workflow either with failure or sucess scenario we can avoid asking the user to implement this handler and just use this generic handler for all the response events. This is how we configure it in DI:
public WorkflowStepConfiguration WithPossibleResponse<TEvent>(ResponseEventType responseEventType)
        where TEvent : WorkflowEvent
    {
        _responseEventRegistry.Register(typeof(TEvent).Name, responseEventType);
        _serviceCollection.AddScoped<IWorkflowEventHandler<TEvent>, DynamicWorkflowEventHandler<TEvent>>();
        _serviceCollection.AddKeyedTransient<IIntegrationEventHandler, WorkflowEventHandlerAdapter<TEvent>>(typeof(TEvent));
        _serviceCollection.Configure(delegate (EventBusSubscriptionInfo o)
        {
            o.EventTypes[typeof(TEvent).FullName!] = typeof(TEvent);
        });

        return this;
    }
  • WorkflowStepConfiguration: This class defines some methods methods that allow us to configure the workflow step with RetryPolicy, CompensationPolicy and PossibleResponses.
  • WorkflowBuilder: Is a builder class for the WorkflowDefinition, allowing us to add the steps and initiator compensations in the workflow.

Persistence

For the persistence we use the EF core so we define the entity classes for some of the SHAKA.DWE models: WorkflowDataEntity, WorkflowInstanceEntity, WorkflowStepHistoryEntity. This methods define the mapping methods for these entities.

  • EFWorkflowRepository: Implements IWorkflowRepository using EF Core and it uses the entity classes to workf with db but it returns SHAKA.DWE models as a response.

ServiceCollectionExtensions_ defines an extensions method to configure previously mentioned services in DI.

  • AddRabbitMQEFWorkflowEventBus: Registers the event bus, publisher, workflow engine.
  • AddWorkflowEventHandler: Registers the workflow event handler adapter and the actual workflow event handler.
  • AddEFWorkflowPersistence: Registers the EF workflow repository.
  • ConfigureDistributedWorkflowEntities: this method must be called in the client libraries DbContexts OnModelCreating method in order to configure the entities related to Distributed Workflow Engine.

Problems That Arise During Horizontal Scaling of the Orchestrator

1. In‑Memory Circuit Breaker State Divergence
  • Risk: Each node keeps its own breaker state; resumes on different nodes may bypass open circuits.
  • Fix:
    • Persist breaker state in a shared store (DB/Redis) keyed by dependency/step.
    • Expose ICircuitStateStore (Get/Set/CompareExchange, OpenUntil, success/failure counters).
    • Base "open until" on server/DB time; add TTL and fencing token.
2. ResumeWorkflow Races and Duplicate Resumes
  • Risk: Redeliveries or handler retries call ResumeWorkflow concurrently on different nodes.
  • Fix:
    • Add optimistic concurrency (rowversion/timestamp) to WorkflowInstanceEntity.
    • Predicate updates: WHERE Id = @id AND State = 'Suspended' AND CurrentStepIndex = @idx.
    • Validate against ExpectedEventTypes; ignore unexpected events.
    • Ensure resume is idempotent: if not suspended or step advanced, no-op.
3. Duplicate Compensations
  • Risk: Concurrent failure paths lead to compensation twice.
  • Fix:
    • Execute compensation only after an atomic state transition to Compensating guarded by concurrency token.
    • Make compensations idempotent (idempotency keys per step + instance).
    • Record Compensation_{Step}_Done flag and check before executing.
4. Outbox Publisher Contention
  • Risk: Multiple publishers read the same rows.
  • Fix:
    • Guarded claim: UPDATE ... SET State = InProgress WHERE Id=@id AND State IN (NotPublished, Failed) and check rows affected.
    • Prefer atomic "claim N" pattern (UPDATE TOP(@n)... OUTPUT inserted.*) to avoid two‑phase read/claim.
    • Keep idempotent publish on the consumer side using EventId.
5. Auto Response Handlers on Every Node
  • Risk: Misconfigured topology (multiple queues) causes duplicate deliveries.
  • Fix:
    • Ensure one shared queue per response event type (competing consumers).
    • Keep manual handlers only for non‑workflow domain events; response handlers are auto‑registered once via a singleton registry.
6. Retry/Backoff Timing Skew
  • Risk: Different clocks produce different retry due times.
  • Fix:
    • Compute backoff due times with DB/server time (e.g., GETUTCDATE()).
    • Persist NextRetryUtc; only resume when now >= NextRetryUtc with predicate updates.
7. Step Execution Duplication
  • Risk: Two nodes execute the same step due to races.
  • Fix:
    • Transition instance to Running with a predicate (state/index) before action.
    • Record StepHistory with a unique constraint (InstanceId, StepIndex) to reject duplicates.
    • Make step side‑effects idempotent where possible.
8. Event Handler Idempotency
  • Risk: Broker redeliveries reprocess response events.
  • Fix:
    • Store processed EventIds (per handler) with TTL; ignore duplicates.
    • Alternatively, ensure ResumeWorkflow is idempotent (see #2).
9. Leader‑Only Orchestration Work (Future Timers/Scans)
  • Risk: If you add due‑step scans, cleanups, or reminder jobs, each node will run them.
  • Fix:
    • Add leader election (DB lease, Redis lock, or K8s Lease):
      • Lease row: OwnerId, LeaseId, ExpiresAt, Version; renew heartbeats; use fencing token on writes.
      • Only leader runs scheduled scans/cleanup; followers handle events.
10. Circuit Breaker Backoff Across Nodes
  • Risk: A breaker opens on one node but another resumes too early.
  • Fix:
    • Store OpenedAt/OpenUntil in shared store; check IsExecutionAllowed against shared state.
    • Record success/failure to the same store; use monotonic version to serialize transitions.
11. Clock Skew and TTLs
  • Risk: Expiry calculations differ per node.
  • Fix:
    • Always use DB time for lease/expiry and retry computations.
    • Avoid relying on local system time for orchestration decisions.
12. Observability Gaps
  • Risk: Hard to diagnose split‑brain/duplication.
  • Fix:
    • Emit metrics: resumes attempted/succeeded, rejected resumes, duplicate compensations blocked, breaker opens, outbox claims/publishes.
    • Structured logs with InstanceId, StepIndex, FencingToken/Version.
13. Queue Topology Drift
  • Risk: Switching from direct single queue to per‑instance queues introduces duplication.
  • Fix:
    • Document and enforce exchange/queue bindings in code/config.
    • Add a health check to verify expected bindings at startup.
Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.1.0 142 9/12/2025
1.0.1 139 9/12/2025
1.0.0 178 9/11/2025