DigitTwin.MessageBus.Abstractions 1.0.0

dotnet add package DigitTwin.MessageBus.Abstractions --version 1.0.0
                    
NuGet\Install-Package DigitTwin.MessageBus.Abstractions -Version 1.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="DigitTwin.MessageBus.Abstractions" Version="1.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="DigitTwin.MessageBus.Abstractions" Version="1.0.0" />
                    
Directory.Packages.props
<PackageReference Include="DigitTwin.MessageBus.Abstractions" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add DigitTwin.MessageBus.Abstractions --version 1.0.0
                    
#r "nuget: DigitTwin.MessageBus.Abstractions, 1.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package DigitTwin.MessageBus.Abstractions@1.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=DigitTwin.MessageBus.Abstractions&version=1.0.0
                    
Install as a Cake Addin
#tool nuget:?package=DigitTwin.MessageBus.Abstractions&version=1.0.0
                    
Install as a Cake Tool

MessageBus

MessageBus — кросс-платформенная библиотека для работы с брокерами сообщений (RabbitMQ, Kafka) через MassTransit с поддержкой DI, IOptions, HealthCheck, транзакций, подтверждений доставки, восстановления соединения и сериализации сообщений. Позволяет одновременно использовать несколько брокеров без BackgroundService для consumer'ов.


Оглавление


Возможности

  • Унифицированные абстракции для работы с шиной сообщений
  • Поддержка RabbitMQ и Kafka (через MassTransit)
  • Интеграция с DI и IOptions
  • HealthCheck для мониторинга состояния
  • Транзакции и подтверждения доставки
  • Восстановление соединения
  • Гибкая сериализация сообщений (JSON, XML, кастомные)
  • Одновременная работа с несколькими брокерами
  • Современные best practices MassTransit и .NET
  • Поддержка unit-тестирования

Архитектура

  • MessageBus.Abstractions — интерфейсы (IBus, IProducer, IConsumer, IMessage, IBusOptions, IMessageSerializer)
  • MessageBus.Core — базовые реализации (BusOptions, JsonMessageSerializer, DefaultProducer, DefaultConsumer)
  • MessageBus.MassTransit — адаптер для MassTransit (RabbitMQ/Kafka), HealthCheck, транзакции, подтверждения, восстановление
  • MessageBus.DependencyInjection — расширения для DI, поддержка IOptions, регистрация нескольких bus

Принципы работы

  • Все взаимодействия асинхронные (Task/Task<T>)
  • Producer и Consumer регистрируются через DI
  • Для RabbitMQ используется собственный адаптер, для Kafka — MassTransit Rider
  • Не используются BackgroundService для consumer'ов — обработка сообщений интегрируется напрямую через DI
  • Все интерфейсы снабжены XML summary на русском языке

Быстрый старт

1. Установка NuGet-пакетов

Добавьте ссылки на проекты/пакеты:

  • MessageBus.Abstractions
  • MessageBus.Core
  • MessageBus.MassTransit
  • MessageBus.DependencyInjection
  • MassTransit, MassTransit.RabbitMQ, MassTransit.Kafka (для Kafka)

2. Конфигурация RabbitMQ через DI/IOptions

// appsettings.json
"RabbitMQ": {
  "BrokerType": "RabbitMQ",
  "ConnectionString": "amqp://guest:guest@localhost:5672/"
}
// Program.cs
builder.Services.AddRabbitMqBus<BusOptions>(builder.Configuration);

3. Конфигурация Kafka через DI/IOptions

Внимание: Для Kafka используйте AddMassTransitRider и AddRider. Пример:

// appsettings.json
"Kafka": {
  "BrokerType": "Kafka",
  "ConnectionString": "PLAINTEXT://localhost:9092",
  "ClientId": "my-app"
}
// Program.cs
builder.Services.AddMassTransit(x =>
{
    x.AddRider(rider =>
    {
        // Конфигурируйте Kafka endpoints
    });
});

Сценарии использования

RabbitMQ

  • Используйте AddRabbitMqBus<TOptions> для регистрации bus и всех зависимостей через DI.
  • Поддерживаются HealthCheck, транзакции, подтверждения доставки, восстановление соединения.

Kafka

  • Используйте AddMassTransitRider и AddRider для интеграции с Kafka.
  • Для сериализации сообщений используйте стандартные или кастомные сериализаторы.

Producer/Consumer

  • Определите сообщение и consumer:
public class MyMessage : IMessage { ... }
public class MyConsumer : IConsumer<MyMessage> { ... }
  • Зарегистрируйте consumer через DI:
services.AddSingleton<IConsumer<MyMessage>, MyConsumer>();
  • Запустите bus и отправьте сообщение:
await bus.StartAsync();
await bus.PublishAsync(new MyMessage { ... });

HealthCheck

  • Включите HealthChecks для мониторинга состояния bus:
builder.Services.AddHealthChecks().AddCheck<MassTransitBus>("bus");
  • Можно реализовать кастомный HealthCheck для расширенной диагностики.

Транзакции и подтверждения доставки

  • Используйте Outbox через MassTransit для гарантии доставки (exactly-once):
cfg.UseInMemoryOutbox(context);

Кастомная сериализация

  • Реализуйте свой сериализатор, например, для XML:
public class XmlMessageSerializer : IMessageSerializer { ... }
services.AddSingleton<IMessageSerializer, XmlMessageSerializer>();

Graceful shutdown и восстановление

  • Корректно завершайте работу через DisposeAsync или StopAsync:
await bus.DisposeAsync();
  • Восстановление соединения происходит автоматически при сбоях (RabbitMQ).

Несколько брокеров и множественная регистрация bus

  • Зарегистрируйте несколько bus с разными настройками:
builder.Services.AddRabbitMqBus<RabbitOptions>(builder.Configuration, "RabbitMQ");
builder.Services.AddMassTransit(x => { ... });

Расширенные примеры

Полный пример: регистрация и использование Consumer (RabbitMQ)

// 1. Определяем сообщение
public class MyMessage : IMessage { ... }
// 2. Определяем consumer
public class MyConsumer : IConsumer<MyMessage> { ... }
// 3. Регистрация в DI и запуск
var services = new ServiceCollection();
var options = new BusOptions { ... };
services.AddSingleton<IBusOptions>(options);
services.AddRabbitMqBus<BusOptions>(new ConfigurationBuilder().Build());
var consumer = new MyConsumer();
services.AddSingleton<IConsumer<MyMessage>>(consumer);
var provider = services.BuildServiceProvider();
var bus = provider.GetRequiredService<IBus>();
await bus.StartAsync();
// 4. Отправка сообщения
await bus.PublishAsync(new MyMessage { Payload = "Привет, Consumer!" });
await Task.WhenAny(consumer.Received.Task, Task.Delay(5000));
await bus.StopAsync();

Полный пример: регистрация и использование Consumer (Kafka через MassTransit Rider)

// 1. Определяем сообщение
public class KafkaMessage : IMessage { ... }
// 2. Определяем consumer
public class KafkaConsumer : IConsumer<KafkaMessage> { ... }
// 3. Регистрация через MassTransit Rider
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
    x.AddConsumer<KafkaConsumer>();
    x.AddRider(rider =>
    {
        rider.AddConsumer<KafkaConsumer>();
        rider.UsingKafka((context, k) =>
        {
            k.Host("localhost:9092");
            k.TopicEndpoint<KafkaMessage>("sample-topic", "sample-group", e =>
            {
                e.ConfigureConsumer<KafkaConsumer>(context);
            });
        });
    });
});
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync();
// 4. Отправка сообщения
await busControl.Publish(new KafkaMessage { Payload = "Привет, Kafka Consumer!" });
var kafkaConsumer = provider.GetRequiredService<KafkaConsumer>();
await Task.WhenAny(kafkaConsumer.Received.Task, Task.Delay(5000));
await busControl.StopAsync();

Транзакции и подтверждения доставки (RabbitMQ)

builder.Services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("amqp://guest:guest@localhost:5672/");
        cfg.UseMessageRetry(r => r.Interval(5, TimeSpan.FromSeconds(10)));
        cfg.UseInMemoryOutbox(context); // Гарантия доставки (exactly-once)
    });
});

Кастомный HealthCheck

public class CustomBusHealthCheck : IHealthCheck { ... }
builder.Services.AddHealthChecks().AddCheck<CustomBusHealthCheck>("custom-bus");

Кастомная сериализация сообщений

public class XmlMessageSerializer : IMessageSerializer { ... }
builder.Services.AddSingleton<IMessageSerializer, XmlMessageSerializer>();

Graceful shutdown и восстановление соединения

await bus.DisposeAsync(); // Корректное завершение работы

Регистрация нескольких bus с разными настройками

builder.Services.AddRabbitMqBus<RabbitOptions>(builder.Configuration, "RabbitMQ");
builder.Services.AddMassTransit(x => { ... });

Best practices и советы

  • Используйте асинхронные методы для всех операций с bus и сообщениями
  • Для продакшена обязательно настройте retry, outbox и HealthCheck
  • Для Kafka всегда используйте AddMassTransitRider
  • Для unit-тестирования используйте InMemory bus
  • Не храните чувствительные данные в сообщениях без шифрования
  • Для сложных сценариев используйте отдельные топики/очереди для разных типов сообщений
  • Следите за актуальностью NuGet-пакетов MassTransit и брокеров

FAQ и troubleshooting

Q: Почему consumer не получает сообщения?

  • Проверьте регистрацию consumer в DI
  • Убедитесь, что bus запущен (StartAsync)
  • Проверьте настройки брокера и подключения

Q: Как добавить несколько consumer для одного типа сообщения?

  • Зарегистрируйте несколько реализаций IConsumer<T> через DI

Q: Как реализовать dead-letter queue?

  • Используйте стандартные механизмы MassTransit для DLQ

Q: Как тестировать обработку сообщений?

  • Используйте InMemory bus и TaskCompletionSource для ожидания сообщений

Q: Как реализовать отложенную доставку (delay)?

  • Используйте механизмы отложенных сообщений MassTransit

Ссылки и документация


Лицензия

MIT


Контакты

  • Вопросы и предложения: issues или pull request в репозитории
  • Автор: [Ваше имя или команда]

MessageBus — современное решение для интеграции с брокерами сообщений в .NET-проектах с максимальной гибкостью и расширяемостью.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
  • net8.0

    • No dependencies.

NuGet packages (3)

Showing the top 3 NuGet packages that depend on DigitTwin.MessageBus.Abstractions:

Package Downloads
DigitTwin.MessageBus.MassTransit

Адаптер MessageBus для MassTransit с поддержкой RabbitMQ и Kafka, HealthCheck, транзакций и подтверждений доставки.

DigitTwin.MessageBus.DependecyInjection

Интеграция MessageBus с Dependency Injection и поддержка IOptions.

DigitTwin.MessageBus.Core

Базовые сервисы, сериализация и инфраструктура для MessageBus.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.0 130 7/13/2025