Indiko.Blocks.EventBus.RabbitMQ 2.1.1

dotnet add package Indiko.Blocks.EventBus.RabbitMQ --version 2.1.1
                    
NuGet\Install-Package Indiko.Blocks.EventBus.RabbitMQ -Version 2.1.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Indiko.Blocks.EventBus.RabbitMQ" Version="2.1.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Indiko.Blocks.EventBus.RabbitMQ" Version="2.1.1" />
                    
Directory.Packages.props
<PackageReference Include="Indiko.Blocks.EventBus.RabbitMQ" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Indiko.Blocks.EventBus.RabbitMQ --version 2.1.1
                    
#r "nuget: Indiko.Blocks.EventBus.RabbitMQ, 2.1.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Indiko.Blocks.EventBus.RabbitMQ@2.1.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Indiko.Blocks.EventBus.RabbitMQ&version=2.1.1
                    
Install as a Cake Addin
#tool nuget:?package=Indiko.Blocks.EventBus.RabbitMQ&version=2.1.1
                    
Install as a Cake Tool

Indiko.Blocks.EventBus.RabbitMQ

RabbitMQ-based distributed event bus implementation for microservices and scalable event-driven architectures.

Overview

This package provides a production-ready RabbitMQ implementation of the event bus abstractions, enabling reliable message delivery across distributed systems with features like persistence, acknowledgments, and automatic reconnection.

Features

  • RabbitMQ Integration: Full RabbitMQ messaging support via EasyNetQ
  • Distributed Events: Publish-subscribe across multiple services
  • Durable Messages: Persistent message storage
  • Automatic Reconnection: Handles connection failures gracefully
  • Queue Management: Automatic queue creation and routing
  • Message Acknowledgment: Reliable message delivery
  • Dead Letter Queues: Failed message handling
  • Multiple Consumers: Scale horizontally with competing consumers
  • Auto-Discovery: Automatic handler registration from DI container

Installation

dotnet add package Indiko.Blocks.EventBus.RabbitMQ

Prerequisites

  • RabbitMQ server (3.8+)
  • Management plugin enabled (optional, for monitoring)

Quick Start

Install RabbitMQ

Docker
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management
Local Installation

Configure Services

using Indiko.Blocks.EventBus.RabbitMQ;

public class Startup : WebStartup
{
    public override void ConfigureServices(IServiceCollection services)
    {
        base.ConfigureServices(services);
        
        // Configure RabbitMQ event bus
        services.AddRabbitMQEventBus(options =>
        {
            options.Host = Configuration["RabbitMQ:Host"];
            options.Port = Configuration.GetValue<int>("RabbitMQ:Port");
            options.Username = Configuration["RabbitMQ:Username"];
            options.Password = Configuration["RabbitMQ:Password"];
            options.VirtualHost = Configuration["RabbitMQ:VirtualHost"];
            options.ReConnectOnConnectionLost = true;
        });
        
        // Register event handlers
        services.AddScoped<IEventHandler<UserCreatedEvent>, SendWelcomeEmailHandler>();
        services.AddScoped<IEventHandler<OrderPlacedEvent>, ProcessOrderHandler>();
    }
}

Configuration (appsettings.json)

{
  "RabbitMQ": {
    "Host": "localhost",
    "Port": 5672,
    "Username": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "PrefetchCount": 10,
    "Timeout": 30,
    "ReConnectOnConnectionLost": true,
    "Product": "MyApplication",
    "Platform": ".NET 10"
  }
}

Define Events

using Indiko.Blocks.EventBus.Abstractions.Interfaces;

public class UserCreatedEvent : IEvent
{
    public Guid UserId { get; set; }
    public string Email { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public DateTime CreatedAt { get; set; }
}

public class OrderPlacedEvent : IEvent
{
    public Guid OrderId { get; set; }
    public Guid UserId { get; set; }
    public List<OrderItem> Items { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime OrderDate { get; set; }
}

Implement Handlers

Service A - User Service

public class UserService
{
    private readonly IEventBus _eventBus;
    private readonly IUserRepository _userRepository;

    public async Task<User> CreateUserAsync(CreateUserDto dto)
    {
        var user = new User
        {
            Id = Guid.NewGuid(),
            Email = dto.Email,
            FirstName = dto.FirstName,
            LastName = dto.LastName,
            CreatedAt = DateTime.UtcNow
        };
        
        await _userRepository.AddAsync(user);
        
        // Publish event to RabbitMQ
        await _eventBus.PublishAsync(new UserCreatedEvent
        {
            UserId = user.Id,
            Email = user.Email,
            FirstName = user.FirstName,
            LastName = user.LastName,
            CreatedAt = user.CreatedAt
        });
        
        return user;
    }
}

Service B - Email Service

public class SendWelcomeEmailHandler : IEventHandler<UserCreatedEvent>
{
    private readonly IEmailService _emailService;
    private readonly ILogger<SendWelcomeEmailHandler> _logger;

    public SendWelcomeEmailHandler(IEmailService emailService, ILogger<SendWelcomeEmailHandler> logger)
    {
        _emailService = emailService;
        _logger = logger;
    }

    public async Task HandleAsync(UserCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        _logger.LogInformation($"Sending welcome email to {event.Email}");
        
        try
        {
            await _emailService.SendWelcomeEmailAsync(
                @event.Email,
                @event.FirstName,
                cancellationToken
            );
            
            _logger.LogInformation($"Welcome email sent to {event.Email}");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, $"Failed to send welcome email to {event.Email}");
            throw; // RabbitMQ will retry or move to dead letter queue
        }
    }
}

Service C - Analytics Service

public class TrackUserRegistrationHandler : IEventHandler<UserCreatedEvent>
{
    private readonly IAnalyticsService _analyticsService;

    public async Task HandleAsync(UserCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        await _analyticsService.TrackEventAsync(new AnalyticsEvent
        {
            EventType = "UserRegistration",
            UserId = @event.UserId,
            Timestamp = @event.CreatedAt,
            Properties = new Dictionary<string, object>
            {
                { "email_domain", @event.Email.Split('@')[1] }
            }
        });
    }
}

How It Works

Automatic Handler Registration

The RabbitMQ event bus automatically discovers and registers all handlers from the DI container:

public void AddAllHandlersFromServiceProvider(IServiceProvider serviceProvider)
{
    // Scans all assemblies for IEventHandler<TEvent> implementations
    var eventHandlerTypes = AppDomain.CurrentDomain.GetAssemblies()
        .SelectMany(a => a.GetTypes())
        .Where(t => t.GetInterfaces()
            .Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IEventHandler<>)));

    // Registers each handler with RabbitMQ
    foreach (var handlerType in eventHandlerTypes)
    {
        var handler = serviceProvider.GetService(handlerType);
        if (handler != null)
        {
            RegisterEventHandler(handler);
        }
    }
}

Queue Naming Convention

Each event type gets its own queue:

UserCreatedEvent ? UserCreatedEvent_Queue
OrderPlacedEvent ? OrderPlacedEvent_Queue
PaymentProcessedEvent ? PaymentProcessedEvent_Queue

Message Flow

Publisher (Service A)
    ?
[RabbitMQ Exchange]
    ?
[UserCreatedEvent_Queue]
    ?
Consumers (Services B, C, D...)

Connection Management

Auto-Reconnection

private void Advanced_Disconnected(object sender, DisconnectedEventArgs e)
{
    _logger.LogWarning($"Disconnected from RabbitMQ: {e.Reason}");

    if (_options.ReConnectOnConnectionLost)
    {
        _logger.LogInformation("Reconnecting to RabbitMQ...");
        UnRegisterAllEventHandlers();
        AddAllHandlersFromServiceProvider(_serviceProvider);
        _logger.LogInformation("Reconnected to RabbitMQ");
    }
}

Connection Events

_bus.Advanced.Connected += Advanced_Connected;
_bus.Advanced.Disconnected += Advanced_Disconnected;
_bus.Advanced.MessageReturned += Advanced_MessageReturned;

Advanced Configurations

Multiple RabbitMQ Instances

{
  "RabbitMQ": {
    "Host": "rabbitmq-cluster.example.com",
    "Port": 5672,
    "Username": "app-user",
    "Password": "secure-password",
    "VirtualHost": "/production",
    "Timeout": 60,
    "PrefetchCount": 20
  }
}

Connection String Builder

var connectionString = RabbitConnectionStringBuilder
    .Init(options)
    .Build();

// Produces: host=localhost:5672;virtualHost=/;username=guest;password=guest

Manual Handler Registration

public void Configure(IApplicationBuilder app, IEventBus eventBus, IServiceProvider services)
{
    // Manual registration if needed
    var handler = services.GetRequiredService<IEventHandler<UserCreatedEvent>>();
    eventBus.RegisterEventHandler(handler);
}

Error Handling and Retries

Automatic Retries

RabbitMQ automatically retries failed messages based on configuration:

public class ResilientEventHandler : IEventHandler<OrderPlacedEvent>
{
    public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken cancellationToken)
    {
        try
        {
            await ProcessOrderAsync(@event);
        }
        catch (TransientException ex)
        {
            // Throw to trigger RabbitMQ retry
            _logger.LogWarning($"Transient error, will retry: {ex.Message}");
            throw;
        }
        catch (PermanentException ex)
        {
            // Log and return (don't throw) to acknowledge message
            _logger.LogError($"Permanent error, message will be dropped: {ex.Message}");
            // Message is acknowledged and won't be retried
        }
    }
}

Dead Letter Queue

Configure dead letter queues for failed messages:

services.AddRabbitMQEventBus(options =>
{
    options.Host = "localhost";
    options.DeadLetterExchange = "dlx";
    options.DeadLetterQueue = "failed_messages";
});

Scaling

Horizontal Scaling

Run multiple instances of your service - RabbitMQ distributes messages:

Service Instance 1 ??
Service Instance 2 ???? [UserCreatedEvent_Queue] ?? RabbitMQ
Service Instance 3 ??

Each message is delivered to only ONE instance (competing consumers).

Message Prefetch

Control how many messages each consumer processes at once:

{
  "RabbitMQ": {
    "PrefetchCount": 10
  }
}

Monitoring

RabbitMQ Management UI

Access at: http://localhost:15672

  • Default credentials: guest/guest
  • View queues, exchanges, message rates
  • Monitor consumer connections

Application Logging

_logger.LogInformation("Connected to RabbitMQ at {host}:{port}", options.Host, options.Port);
_logger.LogDebug("Registered event handler {handler} for {event}", handlerType, eventType);
_logger.LogWarning("Disconnected from RabbitMQ: {reason}", disconnectReason);

Best Practices

  1. Idempotent Handlers: Design handlers to be idempotent (handle duplicates)
  2. Event Versioning: Plan for event schema evolution
  3. Correlation IDs: Include correlation IDs for tracing
  4. Small Events: Keep event payloads small
  5. Error Handling: Distinguish transient vs permanent errors
  6. Connection Pooling: Reuse connections
  7. Monitoring: Set up alerts for queue depths

Deployment

Docker Compose

version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
    volumes:
      - rabbitmq-data:/var/lib/rabbitmq

  user-service:
    build: ./UserService
    environment:
      RabbitMQ__Host: rabbitmq
      RabbitMQ__Username: admin
      RabbitMQ__Password: password
    depends_on:
      - rabbitmq

  email-service:
    build: ./EmailService
    environment:
      RabbitMQ__Host: rabbitmq
      RabbitMQ__Username: admin
      RabbitMQ__Password: password
    depends_on:
      - rabbitmq

volumes:
  rabbitmq-data:

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: user-service
        image: user-service:latest
        env:
        - name: RabbitMQ__Host
          value: "rabbitmq-service"
        - name: RabbitMQ__Username
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: username
        - name: RabbitMQ__Password
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: password

Migration from InMemory

Simply change the registration:

// From
services.AddInMemoryEventBus();

// To
services.AddRabbitMQEventBus(Configuration);

All events and handlers work without modification!

Target Framework

  • .NET 10

Dependencies

  • Indiko.Blocks.EventBus.Abstractions
  • EasyNetQ (8.0+)
  • RabbitMQ.Client (6.0+)

License

See LICENSE file in the repository root.

  • Indiko.Blocks.EventBus.Abstractions - Core event bus abstractions
  • Indiko.Blocks.EventBus.InMemory - In-memory event bus for development
  • Indiko.Blocks.Mediation.Abstractions - CQRS and mediator pattern

Resources

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.1.1 35 12/2/2025
2.1.0 39 12/2/2025
2.0.0 300 9/17/2025
1.7.23 172 9/8/2025
1.7.22 169 9/8/2025
1.7.21 172 8/14/2025
1.7.20 189 6/23/2025
1.7.19 187 6/3/2025
1.7.18 181 5/29/2025
1.7.17 180 5/26/2025
1.7.15 133 4/12/2025
1.7.14 147 4/11/2025
1.7.13 146 3/29/2025
1.7.12 166 3/28/2025
1.7.11 167 3/28/2025
1.7.10 181 3/28/2025
1.7.9 165 3/28/2025
1.7.8 160 3/28/2025
1.7.5 197 3/17/2025
1.7.4 190 3/16/2025
1.7.3 166 3/16/2025
1.7.2 198 3/16/2025
1.7.1 213 3/11/2025
1.6.8 218 3/11/2025
1.6.7 266 3/4/2025
1.6.6 140 2/26/2025
1.6.5 167 2/20/2025
1.6.4 166 2/20/2025
1.6.3 150 2/5/2025
1.6.2 167 1/24/2025
1.6.1 165 1/24/2025
1.6.0 128 1/16/2025
1.5.2 127 1/16/2025
1.5.1 180 11/3/2024
1.5.0 146 10/26/2024
1.3.2 165 10/24/2024
1.3.0 162 10/10/2024
1.2.5 183 10/9/2024
1.2.4 164 10/8/2024
1.2.1 150 10/3/2024
1.2.0 145 9/29/2024
1.1.1 156 9/23/2024
1.1.0 186 9/18/2024
1.0.33 192 9/15/2024
1.0.28 156 8/28/2024
1.0.27 161 8/24/2024
1.0.26 149 7/7/2024
1.0.25 154 7/6/2024
1.0.24 149 6/25/2024
1.0.23 157 6/1/2024
1.0.22 179 5/14/2024
1.0.21 141 5/14/2024
1.0.20 169 4/8/2024
1.0.19 160 4/3/2024
1.0.18 152 3/23/2024
1.0.17 184 3/19/2024
1.0.16 189 3/19/2024
1.0.15 155 3/11/2024
1.0.14 166 3/10/2024
1.0.13 161 3/6/2024
1.0.12 184 3/1/2024
1.0.11 195 3/1/2024
1.0.10 179 3/1/2024
1.0.9 171 3/1/2024
1.0.8 154 2/19/2024
1.0.7 163 2/17/2024
1.0.6 162 2/17/2024
1.0.5 167 2/17/2024
1.0.4 170 2/7/2024
1.0.3 149 2/6/2024
1.0.1 174 2/6/2024
1.0.0 217 1/9/2024
1.0.0-preview99 159 12/22/2023
1.0.0-preview98 140 12/21/2023
1.0.0-preview97 125 12/21/2023
1.0.0-preview96 145 12/20/2023
1.0.0-preview95 158 12/20/2023
1.0.0-preview94 135 12/18/2023
1.0.0-preview93 274 12/13/2023
1.0.0-preview92 139 12/13/2023
1.0.0-preview91 206 12/12/2023
1.0.0-preview90 131 12/11/2023
1.0.0-preview89 128 12/11/2023
1.0.0-preview88 231 12/6/2023
1.0.0-preview87 178 12/6/2023
1.0.0-preview86 164 12/6/2023
1.0.0-preview85 165 12/6/2023
1.0.0-preview84 152 12/5/2023
1.0.0-preview83 198 12/5/2023
1.0.0-preview82 169 12/5/2023
1.0.0-preview81 145 12/4/2023
1.0.0-preview80 144 12/1/2023
1.0.0-preview77 141 12/1/2023
1.0.0-preview76 155 12/1/2023
1.0.0-preview75 136 12/1/2023
1.0.0-preview74 173 11/26/2023
1.0.0-preview73 163 11/7/2023
1.0.0-preview72 142 11/6/2023
1.0.0-preview71 158 11/3/2023
1.0.0-preview70 143 11/2/2023
1.0.0-preview69 141 11/2/2023
1.0.0-preview68 152 11/2/2023
1.0.0-preview67 137 11/2/2023
1.0.0-preview66 141 11/2/2023
1.0.0-preview65 150 11/2/2023
1.0.0-preview64 173 11/2/2023
1.0.0-preview63 139 11/2/2023
1.0.0-preview62 131 11/1/2023
1.0.0-preview61 144 11/1/2023
1.0.0-preview60 139 11/1/2023
1.0.0-preview59 169 11/1/2023
1.0.0-preview58 155 10/31/2023
1.0.0-preview57 140 10/31/2023
1.0.0-preview56 159 10/31/2023
1.0.0-preview55 142 10/31/2023
1.0.0-preview54 134 10/31/2023
1.0.0-preview53 129 10/31/2023
1.0.0-preview52 136 10/31/2023
1.0.0-preview51 137 10/31/2023
1.0.0-preview50 145 10/31/2023
1.0.0-preview48 141 10/31/2023
1.0.0-preview46 126 10/31/2023
1.0.0-preview45 149 10/31/2023
1.0.0-preview44 140 10/31/2023
1.0.0-preview43 162 10/31/2023
1.0.0-preview42 169 10/30/2023
1.0.0-preview41 152 10/30/2023
1.0.0-preview40 149 10/27/2023
1.0.0-preview39 172 10/27/2023
1.0.0-preview38 143 10/27/2023
1.0.0-preview37 162 10/27/2023
1.0.0-preview36 124 10/27/2023
1.0.0-preview35 140 10/27/2023
1.0.0-preview34 136 10/27/2023
1.0.0-preview33 158 10/26/2023
1.0.0-preview32 160 10/26/2023
1.0.0-preview31 154 10/26/2023
1.0.0-preview30 153 10/26/2023
1.0.0-preview29 144 10/26/2023
1.0.0-preview28 154 10/26/2023
1.0.0-preview27 145 10/26/2023
1.0.0-preview26 170 10/25/2023
1.0.0-preview25 178 10/23/2023
1.0.0-preview24 151 10/23/2023
1.0.0-preview23 143 10/23/2023
1.0.0-preview22 149 10/23/2023
1.0.0-preview21 145 10/23/2023
1.0.0-preview20 152 10/20/2023
1.0.0-preview19 168 10/19/2023
1.0.0-preview18 147 10/18/2023
1.0.0-preview16 152 10/11/2023
1.0.0-preview14 178 10/10/2023
1.0.0-preview13 153 10/10/2023
1.0.0-preview12 137 10/9/2023
1.0.0-preview101 147 1/5/2024