DigitTwin.MessageBus.Abstractions
1.0.0
dotnet add package DigitTwin.MessageBus.Abstractions --version 1.0.0
NuGet\Install-Package DigitTwin.MessageBus.Abstractions -Version 1.0.0
<PackageReference Include="DigitTwin.MessageBus.Abstractions" Version="1.0.0" />
<PackageVersion Include="DigitTwin.MessageBus.Abstractions" Version="1.0.0" />
<PackageReference Include="DigitTwin.MessageBus.Abstractions" />
paket add DigitTwin.MessageBus.Abstractions --version 1.0.0
#r "nuget: DigitTwin.MessageBus.Abstractions, 1.0.0"
#:package DigitTwin.MessageBus.Abstractions@1.0.0
#addin nuget:?package=DigitTwin.MessageBus.Abstractions&version=1.0.0
#tool nuget:?package=DigitTwin.MessageBus.Abstractions&version=1.0.0
MessageBus
MessageBus — кросс-платформенная библиотека для работы с брокерами сообщений (RabbitMQ, Kafka) через MassTransit с поддержкой DI, IOptions, HealthCheck, транзакций, подтверждений доставки, восстановления соединения и сериализации сообщений. Позволяет одновременно использовать несколько брокеров без BackgroundService для consumer'ов.
Оглавление
- Возможности
- Архитектура
- Быстрый старт
- Сценарии использования
- Расширенные примеры
- Best practices и советы
- FAQ и troubleshooting
- Ссылки и документация
- Лицензия
- Контакты
Возможности
- Унифицированные абстракции для работы с шиной сообщений
- Поддержка RabbitMQ и Kafka (через MassTransit)
- Интеграция с DI и IOptions
- HealthCheck для мониторинга состояния
- Транзакции и подтверждения доставки
- Восстановление соединения
- Гибкая сериализация сообщений (JSON, XML, кастомные)
- Одновременная работа с несколькими брокерами
- Современные best practices MassTransit и .NET
- Поддержка unit-тестирования
Архитектура
- MessageBus.Abstractions — интерфейсы (IBus, IProducer, IConsumer, IMessage, IBusOptions, IMessageSerializer)
- MessageBus.Core — базовые реализации (BusOptions, JsonMessageSerializer, DefaultProducer, DefaultConsumer)
- MessageBus.MassTransit — адаптер для MassTransit (RabbitMQ/Kafka), HealthCheck, транзакции, подтверждения, восстановление
- MessageBus.DependencyInjection — расширения для DI, поддержка IOptions, регистрация нескольких bus
Принципы работы
- Все взаимодействия асинхронные (
Task
/Task<T>
) - Producer и Consumer регистрируются через DI
- Для RabbitMQ используется собственный адаптер, для Kafka — MassTransit Rider
- Не используются BackgroundService для consumer'ов — обработка сообщений интегрируется напрямую через DI
- Все интерфейсы снабжены XML summary на русском языке
Быстрый старт
1. Установка NuGet-пакетов
Добавьте ссылки на проекты/пакеты:
- MessageBus.Abstractions
- MessageBus.Core
- MessageBus.MassTransit
- MessageBus.DependencyInjection
- MassTransit, MassTransit.RabbitMQ, MassTransit.Kafka (для Kafka)
2. Конфигурация RabbitMQ через DI/IOptions
// appsettings.json
"RabbitMQ": {
"BrokerType": "RabbitMQ",
"ConnectionString": "amqp://guest:guest@localhost:5672/"
}
// Program.cs
builder.Services.AddRabbitMqBus<BusOptions>(builder.Configuration);
3. Конфигурация Kafka через DI/IOptions
Внимание: Для Kafka используйте AddMassTransitRider и AddRider. Пример:
// appsettings.json
"Kafka": {
"BrokerType": "Kafka",
"ConnectionString": "PLAINTEXT://localhost:9092",
"ClientId": "my-app"
}
// Program.cs
builder.Services.AddMassTransit(x =>
{
x.AddRider(rider =>
{
// Конфигурируйте Kafka endpoints
});
});
Сценарии использования
RabbitMQ
- Используйте
AddRabbitMqBus<TOptions>
для регистрации bus и всех зависимостей через DI. - Поддерживаются HealthCheck, транзакции, подтверждения доставки, восстановление соединения.
Kafka
- Используйте
AddMassTransitRider
иAddRider
для интеграции с Kafka. - Для сериализации сообщений используйте стандартные или кастомные сериализаторы.
Producer/Consumer
- Определите сообщение и consumer:
public class MyMessage : IMessage { ... }
public class MyConsumer : IConsumer<MyMessage> { ... }
- Зарегистрируйте consumer через DI:
services.AddSingleton<IConsumer<MyMessage>, MyConsumer>();
- Запустите bus и отправьте сообщение:
await bus.StartAsync();
await bus.PublishAsync(new MyMessage { ... });
HealthCheck
- Включите HealthChecks для мониторинга состояния bus:
builder.Services.AddHealthChecks().AddCheck<MassTransitBus>("bus");
- Можно реализовать кастомный HealthCheck для расширенной диагностики.
Транзакции и подтверждения доставки
- Используйте Outbox через MassTransit для гарантии доставки (exactly-once):
cfg.UseInMemoryOutbox(context);
Кастомная сериализация
- Реализуйте свой сериализатор, например, для XML:
public class XmlMessageSerializer : IMessageSerializer { ... }
services.AddSingleton<IMessageSerializer, XmlMessageSerializer>();
Graceful shutdown и восстановление
- Корректно завершайте работу через
DisposeAsync
илиStopAsync
:
await bus.DisposeAsync();
- Восстановление соединения происходит автоматически при сбоях (RabbitMQ).
Несколько брокеров и множественная регистрация bus
- Зарегистрируйте несколько bus с разными настройками:
builder.Services.AddRabbitMqBus<RabbitOptions>(builder.Configuration, "RabbitMQ");
builder.Services.AddMassTransit(x => { ... });
Расширенные примеры
Полный пример: регистрация и использование Consumer (RabbitMQ)
// 1. Определяем сообщение
public class MyMessage : IMessage { ... }
// 2. Определяем consumer
public class MyConsumer : IConsumer<MyMessage> { ... }
// 3. Регистрация в DI и запуск
var services = new ServiceCollection();
var options = new BusOptions { ... };
services.AddSingleton<IBusOptions>(options);
services.AddRabbitMqBus<BusOptions>(new ConfigurationBuilder().Build());
var consumer = new MyConsumer();
services.AddSingleton<IConsumer<MyMessage>>(consumer);
var provider = services.BuildServiceProvider();
var bus = provider.GetRequiredService<IBus>();
await bus.StartAsync();
// 4. Отправка сообщения
await bus.PublishAsync(new MyMessage { Payload = "Привет, Consumer!" });
await Task.WhenAny(consumer.Received.Task, Task.Delay(5000));
await bus.StopAsync();
Полный пример: регистрация и использование Consumer (Kafka через MassTransit Rider)
// 1. Определяем сообщение
public class KafkaMessage : IMessage { ... }
// 2. Определяем consumer
public class KafkaConsumer : IConsumer<KafkaMessage> { ... }
// 3. Регистрация через MassTransit Rider
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<KafkaConsumer>();
x.AddRider(rider =>
{
rider.AddConsumer<KafkaConsumer>();
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
k.TopicEndpoint<KafkaMessage>("sample-topic", "sample-group", e =>
{
e.ConfigureConsumer<KafkaConsumer>(context);
});
});
});
});
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync();
// 4. Отправка сообщения
await busControl.Publish(new KafkaMessage { Payload = "Привет, Kafka Consumer!" });
var kafkaConsumer = provider.GetRequiredService<KafkaConsumer>();
await Task.WhenAny(kafkaConsumer.Received.Task, Task.Delay(5000));
await busControl.StopAsync();
Транзакции и подтверждения доставки (RabbitMQ)
builder.Services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("amqp://guest:guest@localhost:5672/");
cfg.UseMessageRetry(r => r.Interval(5, TimeSpan.FromSeconds(10)));
cfg.UseInMemoryOutbox(context); // Гарантия доставки (exactly-once)
});
});
Кастомный HealthCheck
public class CustomBusHealthCheck : IHealthCheck { ... }
builder.Services.AddHealthChecks().AddCheck<CustomBusHealthCheck>("custom-bus");
Кастомная сериализация сообщений
public class XmlMessageSerializer : IMessageSerializer { ... }
builder.Services.AddSingleton<IMessageSerializer, XmlMessageSerializer>();
Graceful shutdown и восстановление соединения
await bus.DisposeAsync(); // Корректное завершение работы
Регистрация нескольких bus с разными настройками
builder.Services.AddRabbitMqBus<RabbitOptions>(builder.Configuration, "RabbitMQ");
builder.Services.AddMassTransit(x => { ... });
Best practices и советы
- Используйте асинхронные методы для всех операций с bus и сообщениями
- Для продакшена обязательно настройте retry, outbox и HealthCheck
- Для Kafka всегда используйте AddMassTransitRider
- Для unit-тестирования используйте InMemory bus
- Не храните чувствительные данные в сообщениях без шифрования
- Для сложных сценариев используйте отдельные топики/очереди для разных типов сообщений
- Следите за актуальностью NuGet-пакетов MassTransit и брокеров
FAQ и troubleshooting
Q: Почему consumer не получает сообщения?
- Проверьте регистрацию consumer в DI
- Убедитесь, что bus запущен (
StartAsync
) - Проверьте настройки брокера и подключения
Q: Как добавить несколько consumer для одного типа сообщения?
- Зарегистрируйте несколько реализаций IConsumer<T> через DI
Q: Как реализовать dead-letter queue?
- Используйте стандартные механизмы MassTransit для DLQ
Q: Как тестировать обработку сообщений?
- Используйте InMemory bus и TaskCompletionSource для ожидания сообщений
Q: Как реализовать отложенную доставку (delay)?
- Используйте механизмы отложенных сообщений MassTransit
Ссылки и документация
Лицензия
MIT
Контакты
- Вопросы и предложения: issues или pull request в репозитории
- Автор: [Ваше имя или команда]
MessageBus — современное решение для интеграции с брокерами сообщений в .NET-проектах с максимальной гибкостью и расширяемостью.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- No dependencies.
NuGet packages (3)
Showing the top 3 NuGet packages that depend on DigitTwin.MessageBus.Abstractions:
Package | Downloads |
---|---|
DigitTwin.MessageBus.MassTransit
Адаптер MessageBus для MassTransit с поддержкой RabbitMQ и Kafka, HealthCheck, транзакций и подтверждений доставки. |
|
DigitTwin.MessageBus.DependecyInjection
Интеграция MessageBus с Dependency Injection и поддержка IOptions. |
|
DigitTwin.MessageBus.Core
Базовые сервисы, сериализация и инфраструктура для MessageBus. |
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last Updated |
---|---|---|
1.0.0 | 130 | 7/13/2025 |