Kafka.Bentanik
1.0.1
There is a newer version of this package available.
See the version list below for details.
See the version list below for details.
dotnet add package Kafka.Bentanik --version 1.0.1
NuGet\Install-Package Kafka.Bentanik -Version 1.0.1
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Kafka.Bentanik" Version="1.0.1" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Kafka.Bentanik" Version="1.0.1" />
<PackageReference Include="Kafka.Bentanik" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Kafka.Bentanik --version 1.0.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Kafka.Bentanik, 1.0.1"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Kafka.Bentanik@1.0.1
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Kafka.Bentanik&version=1.0.1
#tool nuget:?package=Kafka.Bentanik&version=1.0.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
Kafka.Bentanik
A lightweight Kafka library for .NET applications, supporting:
- Kafka Publisher with flexible configuration
- Kafka Subscriber with retry, DLQ and scoped DI
- Outbox pattern integration with plug-in DB support (MongoDB, PostgreSQL, MSSQL, etc.)
Features
- ✅ Publish messages with retries, compression, idempotence, batching
- ✅ Background subscriber with scoped handler resolution
- ✅ Outbox Pattern for transactional publishing (supports any database)
- ✅ DLQ (Dead Letter Queue) support on subscriber and outbox
- ✅ Plug-in architecture: bring your own DB layer
Installation
If published to NuGet:
dotnet add package Kafka.Bentanik
If using locally:
dotnet add package Kafka.Bentanik --source ./libs
Kafka Publisher
Register:
builder.Services.AddKafkaPublisher(cfg =>
{
cfg.BootstrapServers = "localhost:9092";
cfg.Acks = Acks.All;
cfg.EnableIdempotence = true;
cfg.LingerMs = 100;
cfg.CompressionType = CompressionType.Gzip;
});
Use:
public class MyService
{
private readonly IKafkaBentanikPublisher _publisher;
public MyService(IKafkaBentanikPublisher publisher)
=> _publisher = publisher;
public async Task SendAsync()
{
await _publisher.PublishAsync("my-topic", new { Name = "Test", Created = DateTime.UtcNow });
}
}
Kafka Subscriber
Define message + handler:
public class MyMessage
{
public string Id { get; set; }
}
public class MyHandler : IKafkaBentanikSubscriber<MyMessage>
{
public Task HandleAsync(MyMessage message, CancellationToken ct)
{
Console.WriteLine($"Received message {message.Id}");
return Task.CompletedTask;
}
}
Register subscriber:
builder.Services.AddKafkaSubscriber<MyMessage, MyHandler>(opt =>
{
opt.Topic = "my-topic";
opt.GroupId = "my-group";
opt.BootstrapServers = "localhost:9092";
opt.MaxRetryAttempts = 3;
opt.RetryDelay = TimeSpan.FromSeconds(2);
opt.DeadLetterTopic = "my-topic-dlq";
});
Outbox Pattern
Supports custom databases via IOutboxStore
interface.
OutboxMessage Model
public class OutboxMessage
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string EventType { get; set; } = default!;
public string Payload { get; set; } = default!;
public string Topic { get; set; } = default!;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public bool Processed { get; set; } = false;
public int RetryCount { get; set; } = 0;
public string? Status { get; set; } = "Pending"; // Pending, Failed, Success, DeadLettered
public DateTime? ProcessedAt { get; set; }
public bool IsDeadLettered => Status == "DeadLettered";
}
Interface
public interface IOutboxStore
{
Task<List<OutboxMessage>> GetUnsentMessagesAsync(int maxCount, CancellationToken ct);
Task MarkAsSentAsync(Guid messageId, CancellationToken ct);
Task IncrementRetryAsync(Guid messageId, CancellationToken ct);
Task MoveToDeadLetterAsync(Guid messageId, CancellationToken ct);
}
Register Outbox:
builder.Services.AddKafkaOutbox<MyOutboxStore>();
OutboxProcessor will run in background, check for unsent messages, publish them and update the store.
Example Mongo Implementation
public class MongoOutboxStore : IOutboxStore
{
private readonly IMongoCollection<OutboxMessage> _collection;
public MongoOutboxStore(IMongoDatabase db)
{
_collection = db.GetCollection<OutboxMessage>("outbox");
}
public Task<List<OutboxMessage>> GetUnsentMessagesAsync(int maxCount, CancellationToken ct)
=> _collection.Find(m => !m.Processed && m.Status != "DeadLettered")
.Limit(maxCount)
.ToListAsync(ct);
public Task MarkAsSentAsync(string id, CancellationToken ct)
=> _collection.UpdateOneAsync(x => x.Id == id,
Builders<OutboxMessage>.Update
.Set(x => x.Processed, true)
.Set(x => x.Status, "Success")
.Set(x => x.ProcessedAt, DateTime.UtcNow),
cancellationToken: ct);
public Task IncrementRetryAsync(string id, CancellationToken ct)
=> _collection.UpdateOneAsync(x => x.Id == id,
Builders<OutboxMessage>.Update.Inc(x => x.RetryCount, 1),
cancellationToken: ct);
public Task MoveToDeadLetterAsync(string id, CancellationToken ct)
=> _collection.UpdateOneAsync(x => x.Id == id,
Builders<OutboxMessage>.Update
.Set(x => x.Processed, true)
.Set(x => x.Status, "DeadLettered")
.Set(x => x.ProcessedAt, DateTime.UtcNow),
cancellationToken: ct);
}
License
MIT
Roadmap (optional)
- Add exponential backoff
- Add metrics
- Add CLI to re-publish DLQ messages
- Add schema registry support
Contributing
PRs welcome!
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net8.0
- Confluent.Kafka (>= 2.8.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.0)
- Microsoft.Extensions.Hosting (>= 8.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.