Indiko.Blocks.EventBus.RabbitMQ
2.1.1
dotnet add package Indiko.Blocks.EventBus.RabbitMQ --version 2.1.1
NuGet\Install-Package Indiko.Blocks.EventBus.RabbitMQ -Version 2.1.1
<PackageReference Include="Indiko.Blocks.EventBus.RabbitMQ" Version="2.1.1" />
<PackageVersion Include="Indiko.Blocks.EventBus.RabbitMQ" Version="2.1.1" />
<PackageReference Include="Indiko.Blocks.EventBus.RabbitMQ" />
paket add Indiko.Blocks.EventBus.RabbitMQ --version 2.1.1
#r "nuget: Indiko.Blocks.EventBus.RabbitMQ, 2.1.1"
#:package Indiko.Blocks.EventBus.RabbitMQ@2.1.1
#addin nuget:?package=Indiko.Blocks.EventBus.RabbitMQ&version=2.1.1
#tool nuget:?package=Indiko.Blocks.EventBus.RabbitMQ&version=2.1.1
Indiko.Blocks.EventBus.RabbitMQ
RabbitMQ-based distributed event bus implementation for microservices and scalable event-driven architectures.
Overview
This package provides a production-ready RabbitMQ implementation of the event bus abstractions, enabling reliable message delivery across distributed systems with features like persistence, acknowledgments, and automatic reconnection.
Features
- RabbitMQ Integration: Full RabbitMQ messaging support via EasyNetQ
- Distributed Events: Publish-subscribe across multiple services
- Durable Messages: Persistent message storage
- Automatic Reconnection: Handles connection failures gracefully
- Queue Management: Automatic queue creation and routing
- Message Acknowledgment: Reliable message delivery
- Dead Letter Queues: Failed message handling
- Multiple Consumers: Scale horizontally with competing consumers
- Auto-Discovery: Automatic handler registration from DI container
Installation
dotnet add package Indiko.Blocks.EventBus.RabbitMQ
Prerequisites
- RabbitMQ server (3.8+)
- Management plugin enabled (optional, for monitoring)
Quick Start
Install RabbitMQ
Docker
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
Local Installation
- Download from: https://www.rabbitmq.com/download.html
Configure Services
using Indiko.Blocks.EventBus.RabbitMQ;
public class Startup : WebStartup
{
public override void ConfigureServices(IServiceCollection services)
{
base.ConfigureServices(services);
// Configure RabbitMQ event bus
services.AddRabbitMQEventBus(options =>
{
options.Host = Configuration["RabbitMQ:Host"];
options.Port = Configuration.GetValue<int>("RabbitMQ:Port");
options.Username = Configuration["RabbitMQ:Username"];
options.Password = Configuration["RabbitMQ:Password"];
options.VirtualHost = Configuration["RabbitMQ:VirtualHost"];
options.ReConnectOnConnectionLost = true;
});
// Register event handlers
services.AddScoped<IEventHandler<UserCreatedEvent>, SendWelcomeEmailHandler>();
services.AddScoped<IEventHandler<OrderPlacedEvent>, ProcessOrderHandler>();
}
}
Configuration (appsettings.json)
{
"RabbitMQ": {
"Host": "localhost",
"Port": 5672,
"Username": "guest",
"Password": "guest",
"VirtualHost": "/",
"PrefetchCount": 10,
"Timeout": 30,
"ReConnectOnConnectionLost": true,
"Product": "MyApplication",
"Platform": ".NET 10"
}
}
Define Events
using Indiko.Blocks.EventBus.Abstractions.Interfaces;
public class UserCreatedEvent : IEvent
{
public Guid UserId { get; set; }
public string Email { get; set; }
public string FirstName { get; set; }
public string LastName { get; set; }
public DateTime CreatedAt { get; set; }
}
public class OrderPlacedEvent : IEvent
{
public Guid OrderId { get; set; }
public Guid UserId { get; set; }
public List<OrderItem> Items { get; set; }
public decimal TotalAmount { get; set; }
public DateTime OrderDate { get; set; }
}
Implement Handlers
Service A - User Service
public class UserService
{
private readonly IEventBus _eventBus;
private readonly IUserRepository _userRepository;
public async Task<User> CreateUserAsync(CreateUserDto dto)
{
var user = new User
{
Id = Guid.NewGuid(),
Email = dto.Email,
FirstName = dto.FirstName,
LastName = dto.LastName,
CreatedAt = DateTime.UtcNow
};
await _userRepository.AddAsync(user);
// Publish event to RabbitMQ
await _eventBus.PublishAsync(new UserCreatedEvent
{
UserId = user.Id,
Email = user.Email,
FirstName = user.FirstName,
LastName = user.LastName,
CreatedAt = user.CreatedAt
});
return user;
}
}
Service B - Email Service
public class SendWelcomeEmailHandler : IEventHandler<UserCreatedEvent>
{
private readonly IEmailService _emailService;
private readonly ILogger<SendWelcomeEmailHandler> _logger;
public SendWelcomeEmailHandler(IEmailService emailService, ILogger<SendWelcomeEmailHandler> logger)
{
_emailService = emailService;
_logger = logger;
}
public async Task HandleAsync(UserCreatedEvent @event, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"Sending welcome email to {event.Email}");
try
{
await _emailService.SendWelcomeEmailAsync(
@event.Email,
@event.FirstName,
cancellationToken
);
_logger.LogInformation($"Welcome email sent to {event.Email}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to send welcome email to {event.Email}");
throw; // RabbitMQ will retry or move to dead letter queue
}
}
}
Service C - Analytics Service
public class TrackUserRegistrationHandler : IEventHandler<UserCreatedEvent>
{
private readonly IAnalyticsService _analyticsService;
public async Task HandleAsync(UserCreatedEvent @event, CancellationToken cancellationToken = default)
{
await _analyticsService.TrackEventAsync(new AnalyticsEvent
{
EventType = "UserRegistration",
UserId = @event.UserId,
Timestamp = @event.CreatedAt,
Properties = new Dictionary<string, object>
{
{ "email_domain", @event.Email.Split('@')[1] }
}
});
}
}
How It Works
Automatic Handler Registration
The RabbitMQ event bus automatically discovers and registers all handlers from the DI container:
public void AddAllHandlersFromServiceProvider(IServiceProvider serviceProvider)
{
// Scans all assemblies for IEventHandler<TEvent> implementations
var eventHandlerTypes = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(a => a.GetTypes())
.Where(t => t.GetInterfaces()
.Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IEventHandler<>)));
// Registers each handler with RabbitMQ
foreach (var handlerType in eventHandlerTypes)
{
var handler = serviceProvider.GetService(handlerType);
if (handler != null)
{
RegisterEventHandler(handler);
}
}
}
Queue Naming Convention
Each event type gets its own queue:
UserCreatedEvent ? UserCreatedEvent_Queue
OrderPlacedEvent ? OrderPlacedEvent_Queue
PaymentProcessedEvent ? PaymentProcessedEvent_Queue
Message Flow
Publisher (Service A)
?
[RabbitMQ Exchange]
?
[UserCreatedEvent_Queue]
?
Consumers (Services B, C, D...)
Connection Management
Auto-Reconnection
private void Advanced_Disconnected(object sender, DisconnectedEventArgs e)
{
_logger.LogWarning($"Disconnected from RabbitMQ: {e.Reason}");
if (_options.ReConnectOnConnectionLost)
{
_logger.LogInformation("Reconnecting to RabbitMQ...");
UnRegisterAllEventHandlers();
AddAllHandlersFromServiceProvider(_serviceProvider);
_logger.LogInformation("Reconnected to RabbitMQ");
}
}
Connection Events
_bus.Advanced.Connected += Advanced_Connected;
_bus.Advanced.Disconnected += Advanced_Disconnected;
_bus.Advanced.MessageReturned += Advanced_MessageReturned;
Advanced Configurations
Multiple RabbitMQ Instances
{
"RabbitMQ": {
"Host": "rabbitmq-cluster.example.com",
"Port": 5672,
"Username": "app-user",
"Password": "secure-password",
"VirtualHost": "/production",
"Timeout": 60,
"PrefetchCount": 20
}
}
Connection String Builder
var connectionString = RabbitConnectionStringBuilder
.Init(options)
.Build();
// Produces: host=localhost:5672;virtualHost=/;username=guest;password=guest
Manual Handler Registration
public void Configure(IApplicationBuilder app, IEventBus eventBus, IServiceProvider services)
{
// Manual registration if needed
var handler = services.GetRequiredService<IEventHandler<UserCreatedEvent>>();
eventBus.RegisterEventHandler(handler);
}
Error Handling and Retries
Automatic Retries
RabbitMQ automatically retries failed messages based on configuration:
public class ResilientEventHandler : IEventHandler<OrderPlacedEvent>
{
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken cancellationToken)
{
try
{
await ProcessOrderAsync(@event);
}
catch (TransientException ex)
{
// Throw to trigger RabbitMQ retry
_logger.LogWarning($"Transient error, will retry: {ex.Message}");
throw;
}
catch (PermanentException ex)
{
// Log and return (don't throw) to acknowledge message
_logger.LogError($"Permanent error, message will be dropped: {ex.Message}");
// Message is acknowledged and won't be retried
}
}
}
Dead Letter Queue
Configure dead letter queues for failed messages:
services.AddRabbitMQEventBus(options =>
{
options.Host = "localhost";
options.DeadLetterExchange = "dlx";
options.DeadLetterQueue = "failed_messages";
});
Scaling
Horizontal Scaling
Run multiple instances of your service - RabbitMQ distributes messages:
Service Instance 1 ??
Service Instance 2 ???? [UserCreatedEvent_Queue] ?? RabbitMQ
Service Instance 3 ??
Each message is delivered to only ONE instance (competing consumers).
Message Prefetch
Control how many messages each consumer processes at once:
{
"RabbitMQ": {
"PrefetchCount": 10
}
}
Monitoring
RabbitMQ Management UI
Access at: http://localhost:15672
- Default credentials: guest/guest
- View queues, exchanges, message rates
- Monitor consumer connections
Application Logging
_logger.LogInformation("Connected to RabbitMQ at {host}:{port}", options.Host, options.Port);
_logger.LogDebug("Registered event handler {handler} for {event}", handlerType, eventType);
_logger.LogWarning("Disconnected from RabbitMQ: {reason}", disconnectReason);
Best Practices
- Idempotent Handlers: Design handlers to be idempotent (handle duplicates)
- Event Versioning: Plan for event schema evolution
- Correlation IDs: Include correlation IDs for tracing
- Small Events: Keep event payloads small
- Error Handling: Distinguish transient vs permanent errors
- Connection Pooling: Reuse connections
- Monitoring: Set up alerts for queue depths
Deployment
Docker Compose
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
volumes:
- rabbitmq-data:/var/lib/rabbitmq
user-service:
build: ./UserService
environment:
RabbitMQ__Host: rabbitmq
RabbitMQ__Username: admin
RabbitMQ__Password: password
depends_on:
- rabbitmq
email-service:
build: ./EmailService
environment:
RabbitMQ__Host: rabbitmq
RabbitMQ__Username: admin
RabbitMQ__Password: password
depends_on:
- rabbitmq
volumes:
rabbitmq-data:
Kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
spec:
replicas: 3
template:
spec:
containers:
- name: user-service
image: user-service:latest
env:
- name: RabbitMQ__Host
value: "rabbitmq-service"
- name: RabbitMQ__Username
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: username
- name: RabbitMQ__Password
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: password
Migration from InMemory
Simply change the registration:
// From
services.AddInMemoryEventBus();
// To
services.AddRabbitMQEventBus(Configuration);
All events and handlers work without modification!
Target Framework
- .NET 10
Dependencies
Indiko.Blocks.EventBus.AbstractionsEasyNetQ(8.0+)RabbitMQ.Client(6.0+)
License
See LICENSE file in the repository root.
Related Packages
Indiko.Blocks.EventBus.Abstractions- Core event bus abstractionsIndiko.Blocks.EventBus.InMemory- In-memory event bus for developmentIndiko.Blocks.Mediation.Abstractions- CQRS and mediator pattern
Resources
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- EasyNetQ (>= 7.8.0)
- EasyNetQ.Serialization.NewtonsoftJson (>= 7.8.0)
- Indiko.Blocks.EventBus.Abstractions (>= 2.1.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 2.1.1 | 35 | 12/2/2025 |
| 2.1.0 | 39 | 12/2/2025 |
| 2.0.0 | 300 | 9/17/2025 |
| 1.7.23 | 172 | 9/8/2025 |
| 1.7.22 | 169 | 9/8/2025 |
| 1.7.21 | 172 | 8/14/2025 |
| 1.7.20 | 189 | 6/23/2025 |
| 1.7.19 | 187 | 6/3/2025 |
| 1.7.18 | 181 | 5/29/2025 |
| 1.7.17 | 180 | 5/26/2025 |
| 1.7.15 | 133 | 4/12/2025 |
| 1.7.14 | 147 | 4/11/2025 |
| 1.7.13 | 146 | 3/29/2025 |
| 1.7.12 | 166 | 3/28/2025 |
| 1.7.11 | 167 | 3/28/2025 |
| 1.7.10 | 181 | 3/28/2025 |
| 1.7.9 | 165 | 3/28/2025 |
| 1.7.8 | 160 | 3/28/2025 |
| 1.7.5 | 197 | 3/17/2025 |
| 1.7.4 | 190 | 3/16/2025 |
| 1.7.3 | 166 | 3/16/2025 |
| 1.7.2 | 198 | 3/16/2025 |
| 1.7.1 | 213 | 3/11/2025 |
| 1.6.8 | 218 | 3/11/2025 |
| 1.6.7 | 266 | 3/4/2025 |
| 1.6.6 | 140 | 2/26/2025 |
| 1.6.5 | 167 | 2/20/2025 |
| 1.6.4 | 166 | 2/20/2025 |
| 1.6.3 | 150 | 2/5/2025 |
| 1.6.2 | 167 | 1/24/2025 |
| 1.6.1 | 165 | 1/24/2025 |
| 1.6.0 | 128 | 1/16/2025 |
| 1.5.2 | 127 | 1/16/2025 |
| 1.5.1 | 180 | 11/3/2024 |
| 1.5.0 | 146 | 10/26/2024 |
| 1.3.2 | 165 | 10/24/2024 |
| 1.3.0 | 162 | 10/10/2024 |
| 1.2.5 | 183 | 10/9/2024 |
| 1.2.4 | 164 | 10/8/2024 |
| 1.2.1 | 150 | 10/3/2024 |
| 1.2.0 | 145 | 9/29/2024 |
| 1.1.1 | 156 | 9/23/2024 |
| 1.1.0 | 186 | 9/18/2024 |
| 1.0.33 | 192 | 9/15/2024 |
| 1.0.28 | 156 | 8/28/2024 |
| 1.0.27 | 161 | 8/24/2024 |
| 1.0.26 | 149 | 7/7/2024 |
| 1.0.25 | 154 | 7/6/2024 |
| 1.0.24 | 149 | 6/25/2024 |
| 1.0.23 | 157 | 6/1/2024 |
| 1.0.22 | 179 | 5/14/2024 |
| 1.0.21 | 141 | 5/14/2024 |
| 1.0.20 | 169 | 4/8/2024 |
| 1.0.19 | 160 | 4/3/2024 |
| 1.0.18 | 152 | 3/23/2024 |
| 1.0.17 | 184 | 3/19/2024 |
| 1.0.16 | 189 | 3/19/2024 |
| 1.0.15 | 155 | 3/11/2024 |
| 1.0.14 | 166 | 3/10/2024 |
| 1.0.13 | 161 | 3/6/2024 |
| 1.0.12 | 184 | 3/1/2024 |
| 1.0.11 | 195 | 3/1/2024 |
| 1.0.10 | 179 | 3/1/2024 |
| 1.0.9 | 171 | 3/1/2024 |
| 1.0.8 | 154 | 2/19/2024 |
| 1.0.7 | 163 | 2/17/2024 |
| 1.0.6 | 162 | 2/17/2024 |
| 1.0.5 | 167 | 2/17/2024 |
| 1.0.4 | 170 | 2/7/2024 |
| 1.0.3 | 149 | 2/6/2024 |
| 1.0.1 | 174 | 2/6/2024 |
| 1.0.0 | 217 | 1/9/2024 |
| 1.0.0-preview99 | 159 | 12/22/2023 |
| 1.0.0-preview98 | 140 | 12/21/2023 |
| 1.0.0-preview97 | 125 | 12/21/2023 |
| 1.0.0-preview96 | 145 | 12/20/2023 |
| 1.0.0-preview95 | 158 | 12/20/2023 |
| 1.0.0-preview94 | 135 | 12/18/2023 |
| 1.0.0-preview93 | 274 | 12/13/2023 |
| 1.0.0-preview92 | 139 | 12/13/2023 |
| 1.0.0-preview91 | 206 | 12/12/2023 |
| 1.0.0-preview90 | 131 | 12/11/2023 |
| 1.0.0-preview89 | 128 | 12/11/2023 |
| 1.0.0-preview88 | 231 | 12/6/2023 |
| 1.0.0-preview87 | 178 | 12/6/2023 |
| 1.0.0-preview86 | 164 | 12/6/2023 |
| 1.0.0-preview85 | 165 | 12/6/2023 |
| 1.0.0-preview84 | 152 | 12/5/2023 |
| 1.0.0-preview83 | 198 | 12/5/2023 |
| 1.0.0-preview82 | 169 | 12/5/2023 |
| 1.0.0-preview81 | 145 | 12/4/2023 |
| 1.0.0-preview80 | 144 | 12/1/2023 |
| 1.0.0-preview77 | 141 | 12/1/2023 |
| 1.0.0-preview76 | 155 | 12/1/2023 |
| 1.0.0-preview75 | 136 | 12/1/2023 |
| 1.0.0-preview74 | 173 | 11/26/2023 |
| 1.0.0-preview73 | 163 | 11/7/2023 |
| 1.0.0-preview72 | 142 | 11/6/2023 |
| 1.0.0-preview71 | 158 | 11/3/2023 |
| 1.0.0-preview70 | 143 | 11/2/2023 |
| 1.0.0-preview69 | 141 | 11/2/2023 |
| 1.0.0-preview68 | 152 | 11/2/2023 |
| 1.0.0-preview67 | 137 | 11/2/2023 |
| 1.0.0-preview66 | 141 | 11/2/2023 |
| 1.0.0-preview65 | 150 | 11/2/2023 |
| 1.0.0-preview64 | 173 | 11/2/2023 |
| 1.0.0-preview63 | 139 | 11/2/2023 |
| 1.0.0-preview62 | 131 | 11/1/2023 |
| 1.0.0-preview61 | 144 | 11/1/2023 |
| 1.0.0-preview60 | 139 | 11/1/2023 |
| 1.0.0-preview59 | 169 | 11/1/2023 |
| 1.0.0-preview58 | 155 | 10/31/2023 |
| 1.0.0-preview57 | 140 | 10/31/2023 |
| 1.0.0-preview56 | 159 | 10/31/2023 |
| 1.0.0-preview55 | 142 | 10/31/2023 |
| 1.0.0-preview54 | 134 | 10/31/2023 |
| 1.0.0-preview53 | 129 | 10/31/2023 |
| 1.0.0-preview52 | 136 | 10/31/2023 |
| 1.0.0-preview51 | 137 | 10/31/2023 |
| 1.0.0-preview50 | 145 | 10/31/2023 |
| 1.0.0-preview48 | 141 | 10/31/2023 |
| 1.0.0-preview46 | 126 | 10/31/2023 |
| 1.0.0-preview45 | 149 | 10/31/2023 |
| 1.0.0-preview44 | 140 | 10/31/2023 |
| 1.0.0-preview43 | 162 | 10/31/2023 |
| 1.0.0-preview42 | 169 | 10/30/2023 |
| 1.0.0-preview41 | 152 | 10/30/2023 |
| 1.0.0-preview40 | 149 | 10/27/2023 |
| 1.0.0-preview39 | 172 | 10/27/2023 |
| 1.0.0-preview38 | 143 | 10/27/2023 |
| 1.0.0-preview37 | 162 | 10/27/2023 |
| 1.0.0-preview36 | 124 | 10/27/2023 |
| 1.0.0-preview35 | 140 | 10/27/2023 |
| 1.0.0-preview34 | 136 | 10/27/2023 |
| 1.0.0-preview33 | 158 | 10/26/2023 |
| 1.0.0-preview32 | 160 | 10/26/2023 |
| 1.0.0-preview31 | 154 | 10/26/2023 |
| 1.0.0-preview30 | 153 | 10/26/2023 |
| 1.0.0-preview29 | 144 | 10/26/2023 |
| 1.0.0-preview28 | 154 | 10/26/2023 |
| 1.0.0-preview27 | 145 | 10/26/2023 |
| 1.0.0-preview26 | 170 | 10/25/2023 |
| 1.0.0-preview25 | 178 | 10/23/2023 |
| 1.0.0-preview24 | 151 | 10/23/2023 |
| 1.0.0-preview23 | 143 | 10/23/2023 |
| 1.0.0-preview22 | 149 | 10/23/2023 |
| 1.0.0-preview21 | 145 | 10/23/2023 |
| 1.0.0-preview20 | 152 | 10/20/2023 |
| 1.0.0-preview19 | 168 | 10/19/2023 |
| 1.0.0-preview18 | 147 | 10/18/2023 |
| 1.0.0-preview16 | 152 | 10/11/2023 |
| 1.0.0-preview14 | 178 | 10/10/2023 |
| 1.0.0-preview13 | 153 | 10/10/2023 |
| 1.0.0-preview12 | 137 | 10/9/2023 |
| 1.0.0-preview101 | 147 | 1/5/2024 |