DigitTwin.MessageBus.DependecyInjection 1.0.0

dotnet add package DigitTwin.MessageBus.DependecyInjection --version 1.0.0
                    
NuGet\Install-Package DigitTwin.MessageBus.DependecyInjection -Version 1.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="DigitTwin.MessageBus.DependecyInjection" Version="1.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="DigitTwin.MessageBus.DependecyInjection" Version="1.0.0" />
                    
Directory.Packages.props
<PackageReference Include="DigitTwin.MessageBus.DependecyInjection" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add DigitTwin.MessageBus.DependecyInjection --version 1.0.0
                    
#r "nuget: DigitTwin.MessageBus.DependecyInjection, 1.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package DigitTwin.MessageBus.DependecyInjection@1.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=DigitTwin.MessageBus.DependecyInjection&version=1.0.0
                    
Install as a Cake Addin
#tool nuget:?package=DigitTwin.MessageBus.DependecyInjection&version=1.0.0
                    
Install as a Cake Tool

MessageBus

MessageBus — кросс-платформенная библиотека для работы с брокерами сообщений (RabbitMQ, Kafka) через MassTransit с поддержкой DI, IOptions, HealthCheck, транзакций, подтверждений доставки, восстановления соединения и сериализации сообщений. Позволяет одновременно использовать несколько брокеров без BackgroundService для consumer'ов.


Оглавление


Возможности

  • Унифицированные абстракции для работы с шиной сообщений
  • Поддержка RabbitMQ и Kafka (через MassTransit)
  • Интеграция с DI и IOptions
  • HealthCheck для мониторинга состояния
  • Транзакции и подтверждения доставки
  • Восстановление соединения
  • Гибкая сериализация сообщений (JSON, XML, кастомные)
  • Одновременная работа с несколькими брокерами
  • Современные best practices MassTransit и .NET
  • Поддержка unit-тестирования

Архитектура

  • MessageBus.Abstractions — интерфейсы (IBus, IProducer, IConsumer, IMessage, IBusOptions, IMessageSerializer)
  • MessageBus.Core — базовые реализации (BusOptions, JsonMessageSerializer, DefaultProducer, DefaultConsumer)
  • MessageBus.MassTransit — адаптер для MassTransit (RabbitMQ/Kafka), HealthCheck, транзакции, подтверждения, восстановление
  • MessageBus.DependencyInjection — расширения для DI, поддержка IOptions, регистрация нескольких bus

Принципы работы

  • Все взаимодействия асинхронные (Task/Task<T>)
  • Producer и Consumer регистрируются через DI
  • Для RabbitMQ используется собственный адаптер, для Kafka — MassTransit Rider
  • Не используются BackgroundService для consumer'ов — обработка сообщений интегрируется напрямую через DI
  • Все интерфейсы снабжены XML summary на русском языке

Быстрый старт

1. Установка NuGet-пакетов

Добавьте ссылки на проекты/пакеты:

  • MessageBus.Abstractions
  • MessageBus.Core
  • MessageBus.MassTransit
  • MessageBus.DependencyInjection
  • MassTransit, MassTransit.RabbitMQ, MassTransit.Kafka (для Kafka)

2. Конфигурация RabbitMQ через DI/IOptions

// appsettings.json
"RabbitMQ": {
  "BrokerType": "RabbitMQ",
  "ConnectionString": "amqp://guest:guest@localhost:5672/"
}
// Program.cs
builder.Services.AddRabbitMqBus<BusOptions>(builder.Configuration);

3. Конфигурация Kafka через DI/IOptions

Внимание: Для Kafka используйте AddMassTransitRider и AddRider. Пример:

// appsettings.json
"Kafka": {
  "BrokerType": "Kafka",
  "ConnectionString": "PLAINTEXT://localhost:9092",
  "ClientId": "my-app"
}
// Program.cs
builder.Services.AddMassTransit(x =>
{
    x.AddRider(rider =>
    {
        // Конфигурируйте Kafka endpoints
    });
});

Сценарии использования

RabbitMQ

  • Используйте AddRabbitMqBus<TOptions> для регистрации bus и всех зависимостей через DI.
  • Поддерживаются HealthCheck, транзакции, подтверждения доставки, восстановление соединения.

Kafka

  • Используйте AddMassTransitRider и AddRider для интеграции с Kafka.
  • Для сериализации сообщений используйте стандартные или кастомные сериализаторы.

Producer/Consumer

  • Определите сообщение и consumer:
public class MyMessage : IMessage { ... }
public class MyConsumer : IConsumer<MyMessage> { ... }
  • Зарегистрируйте consumer через DI:
services.AddSingleton<IConsumer<MyMessage>, MyConsumer>();
  • Запустите bus и отправьте сообщение:
await bus.StartAsync();
await bus.PublishAsync(new MyMessage { ... });

HealthCheck

  • Включите HealthChecks для мониторинга состояния bus:
builder.Services.AddHealthChecks().AddCheck<MassTransitBus>("bus");
  • Можно реализовать кастомный HealthCheck для расширенной диагностики.

Транзакции и подтверждения доставки

  • Используйте Outbox через MassTransit для гарантии доставки (exactly-once):
cfg.UseInMemoryOutbox(context);

Кастомная сериализация

  • Реализуйте свой сериализатор, например, для XML:
public class XmlMessageSerializer : IMessageSerializer { ... }
services.AddSingleton<IMessageSerializer, XmlMessageSerializer>();

Graceful shutdown и восстановление

  • Корректно завершайте работу через DisposeAsync или StopAsync:
await bus.DisposeAsync();
  • Восстановление соединения происходит автоматически при сбоях (RabbitMQ).

Несколько брокеров и множественная регистрация bus

  • Зарегистрируйте несколько bus с разными настройками:
builder.Services.AddRabbitMqBus<RabbitOptions>(builder.Configuration, "RabbitMQ");
builder.Services.AddMassTransit(x => { ... });

Расширенные примеры

Полный пример: регистрация и использование Consumer (RabbitMQ)

// 1. Определяем сообщение
public class MyMessage : IMessage { ... }
// 2. Определяем consumer
public class MyConsumer : IConsumer<MyMessage> { ... }
// 3. Регистрация в DI и запуск
var services = new ServiceCollection();
var options = new BusOptions { ... };
services.AddSingleton<IBusOptions>(options);
services.AddRabbitMqBus<BusOptions>(new ConfigurationBuilder().Build());
var consumer = new MyConsumer();
services.AddSingleton<IConsumer<MyMessage>>(consumer);
var provider = services.BuildServiceProvider();
var bus = provider.GetRequiredService<IBus>();
await bus.StartAsync();
// 4. Отправка сообщения
await bus.PublishAsync(new MyMessage { Payload = "Привет, Consumer!" });
await Task.WhenAny(consumer.Received.Task, Task.Delay(5000));
await bus.StopAsync();

Полный пример: регистрация и использование Consumer (Kafka через MassTransit Rider)

// 1. Определяем сообщение
public class KafkaMessage : IMessage { ... }
// 2. Определяем consumer
public class KafkaConsumer : IConsumer<KafkaMessage> { ... }
// 3. Регистрация через MassTransit Rider
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
    x.AddConsumer<KafkaConsumer>();
    x.AddRider(rider =>
    {
        rider.AddConsumer<KafkaConsumer>();
        rider.UsingKafka((context, k) =>
        {
            k.Host("localhost:9092");
            k.TopicEndpoint<KafkaMessage>("sample-topic", "sample-group", e =>
            {
                e.ConfigureConsumer<KafkaConsumer>(context);
            });
        });
    });
});
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync();
// 4. Отправка сообщения
await busControl.Publish(new KafkaMessage { Payload = "Привет, Kafka Consumer!" });
var kafkaConsumer = provider.GetRequiredService<KafkaConsumer>();
await Task.WhenAny(kafkaConsumer.Received.Task, Task.Delay(5000));
await busControl.StopAsync();

Транзакции и подтверждения доставки (RabbitMQ)

builder.Services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("amqp://guest:guest@localhost:5672/");
        cfg.UseMessageRetry(r => r.Interval(5, TimeSpan.FromSeconds(10)));
        cfg.UseInMemoryOutbox(context); // Гарантия доставки (exactly-once)
    });
});

Кастомный HealthCheck

public class CustomBusHealthCheck : IHealthCheck { ... }
builder.Services.AddHealthChecks().AddCheck<CustomBusHealthCheck>("custom-bus");

Кастомная сериализация сообщений

public class XmlMessageSerializer : IMessageSerializer { ... }
builder.Services.AddSingleton<IMessageSerializer, XmlMessageSerializer>();

Graceful shutdown и восстановление соединения

await bus.DisposeAsync(); // Корректное завершение работы

Регистрация нескольких bus с разными настройками

builder.Services.AddRabbitMqBus<RabbitOptions>(builder.Configuration, "RabbitMQ");
builder.Services.AddMassTransit(x => { ... });

Best practices и советы

  • Используйте асинхронные методы для всех операций с bus и сообщениями
  • Для продакшена обязательно настройте retry, outbox и HealthCheck
  • Для Kafka всегда используйте AddMassTransitRider
  • Для unit-тестирования используйте InMemory bus
  • Не храните чувствительные данные в сообщениях без шифрования
  • Для сложных сценариев используйте отдельные топики/очереди для разных типов сообщений
  • Следите за актуальностью NuGet-пакетов MassTransit и брокеров

FAQ и troubleshooting

Q: Почему consumer не получает сообщения?

  • Проверьте регистрацию consumer в DI
  • Убедитесь, что bus запущен (StartAsync)
  • Проверьте настройки брокера и подключения

Q: Как добавить несколько consumer для одного типа сообщения?

  • Зарегистрируйте несколько реализаций IConsumer<T> через DI

Q: Как реализовать dead-letter queue?

  • Используйте стандартные механизмы MassTransit для DLQ

Q: Как тестировать обработку сообщений?

  • Используйте InMemory bus и TaskCompletionSource для ожидания сообщений

Q: Как реализовать отложенную доставку (delay)?

  • Используйте механизмы отложенных сообщений MassTransit

Ссылки и документация


Лицензия

MIT


Контакты

  • Вопросы и предложения: issues или pull request в репозитории
  • Автор: [Ваше имя или команда]

MessageBus — современное решение для интеграции с брокерами сообщений в .NET-проектах с максимальной гибкостью и расширяемостью.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.0 137 7/13/2025