KafkaIO 1.0.26
See the version list below for details.
dotnet add package KafkaIO --version 1.0.26
NuGet\Install-Package KafkaIO -Version 1.0.26
<PackageReference Include="KafkaIO" Version="1.0.26" />
<PackageVersion Include="KafkaIO" Version="1.0.26" />
<PackageReference Include="KafkaIO" />
paket add KafkaIO --version 1.0.26
#r "nuget: KafkaIO, 1.0.26"
#:package KafkaIO@1.0.26
#addin nuget:?package=KafkaIO&version=1.0.26
#tool nuget:?package=KafkaIO&version=1.0.26
KafkaIO - A Fluent Kafka Configuration Library for .NET
Overview
KafkaIO is a .NET library that provides a fluent interface for configuring Kafka producers and consumers with Microsoft's dependency injection system. It simplifies Kafka client setup with built-in support for common patterns like circuit breakers, retries, batch processing, and outbox patterns.
Features
- Fluent Configuration API - Easy-to-use builder pattern for setting up producers and consumers
- Dependency Injection Integration - Seamless integration with
IServiceCollection
- Built-in Resilience Patterns:
- Circuit breakers
- Retry strategies
- Batch processing
- Outbox pattern
- Serialization Support - JSON serialization out of the box
- Producer Features:
- Message batching
- Error handling
- Configuration options for all Confluent.Kafka producer settings
- Consumer Features:
- Topic subscription
- Message handling
- Configuration options for all Confluent.Kafka consumer settings
Installation
dotnet add package KafkaIO
Quick Start
Basic Producer/Consumer Setup
using KafkaIO;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
var builder = Host.CreateApplicationBuilder();
builder.Services.AddKafkaIO(kafka => kafka
.AddBootstrapServers("localhost:9092")
// Configure producer
.AddProducer("sample.topic")
// Configure consumer
.AddConsumer("sample.group", consumer => consumer
.AddTopic("sample.topic"));
var app = builder.Build();
await app.StartAsync();
// Get producer and send message
var producer = app.Services.GetKeyedService<IProducer>("sample.topic")!;
await producer.ProduceAsync(new SampleMessage { Text = "Hello Kafka!" });
await app.StopAsync();
Advanced Configuration
builder.Services.AddKafkaIO(kafka => kafka
.AddBootstrapServers("localhost:9092")
.AddProducer("orders", producer => producer
.AddSerializer(s => s.UseJsonSerializer())
.AddCircuitBreaker(cb => cb
.WithFailureThreshold(5)
.WithOpenStateDuration(TimeSpan.FromSeconds(30)))
.AddBatchProducer(batch => batch
.WithMessageLimit(100)
.WithTimeLimit(1000))
.AddMessageRetry(retry => retry
.WithIncremental(
retryLimit: 3,
initialInterval: TimeSpan.FromSeconds(1),
intervalIncrement: TimeSpan.FromSeconds(2)))
.AddConsumer("order-processors", consumer => consumer
.AddTopic("orders")
.AddDeserializer(d => d.UseJsonSerializer())
.WithAutoOffsetReset(AutoOffsetReset.Earliest)
.HandleType<Order>(h => h
.AddHandler<OrderProcessor>())));
Configuration Options
Producer Configuration
Method | Description |
---|---|
AddSerializer() |
Configure message serialization |
AddCircuitBreaker() |
Add circuit breaker pattern |
AddBatchProducer() |
Configure batch message processing |
AddMessageRetry() |
Configure retry behavior |
AddOutbox() |
Add outbox pattern |
WithCompressionType() |
Set message compression |
WithTransactionalId() |
Configure transactional producer |
WithLingerMs() |
Set linger time for batching |
Consumer Configuration
Method | Description |
---|---|
AddDeserializer() |
Configure message deserialization |
AddTopic() |
Subscribe to topics |
HandleType<T>() |
Register message handlers |
WithAutoOffsetReset() |
Configure offset behavior |
WithSessionTimeoutMs() |
Set session timeout |
WithIsolationLevel() |
Set read isolation level |
Resilience Patterns
Circuit Breaker
.AddCircuitBreaker(cb => cb
.WithFailureThreshold(5)
.WithOpenStateDuration(TimeSpan.FromSeconds(30))
Retry Strategies
// Fixed intervals
.AddMessageRetry(retry => retry
.WithIntervals(1000, 2000, 3000))
// Incremental intervals
.AddMessageRetry(retry => retry
.WithIncremental(
retryLimit: 3,
initialInterval: TimeSpan.FromSeconds(1),
intervalIncrement: TimeSpan.FromSeconds(2)))
// Immediate retry
.AddMessageRetry(retry => retry
.WithImmediate(retryLimit: 5))
Outbox Pattern
.AddOutbox(outbox => outbox
.UseInMemory()
.WithPollingPeriod(2000))
Message Handling
Define message handlers by implementing IConsumerHandler<T>
:
public class OrderProcessor : IConsumerHandler<Order>
{
public Task HandleAsync(ConsumeResult<string, Order> message, CancellationToken cancellationToken)
{
// Process order
Console.WriteLine($"Processing order {message.Value.OrderId}");
return Task.CompletedTask;
}
}
Advanced Topics
Custom Serialization
Implement ISerializer
/IDeserializer
interfaces for custom serialization:
public class ProtobufSerializer : ISerializer
{
public byte[] Serialize<T>(T data) => /* serialize */;
}
License
MIT License
Contributing
Contributions are welcome! Please open an issue or submit a pull request.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Confluent.Kafka (>= 2.8.0)
- KeyedHostedServices.Contrib.Microsoft.Extensions.Hosting (>= 1.0.14)
- Microsoft.IO.RecyclableMemoryStream (>= 3.0.1)
- Newtonsoft.Json (>= 13.0.1)
-
net9.0
- Confluent.Kafka (>= 2.8.0)
- KeyedHostedServices.Contrib.Microsoft.Extensions.Hosting (>= 1.0.14)
- Microsoft.IO.RecyclableMemoryStream (>= 3.0.1)
- Newtonsoft.Json (>= 13.0.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last Updated |
---|---|---|
1.0.36 | 73 | 5/24/2025 |
1.0.35 | 65 | 5/24/2025 |
1.0.34 | 67 | 5/24/2025 |
1.0.33 | 68 | 5/24/2025 |
1.0.32 | 138 | 5/22/2025 |
1.0.31 | 131 | 5/22/2025 |
1.0.30 | 144 | 5/22/2025 |
1.0.29 | 107 | 5/17/2025 |
1.0.28 | 224 | 5/15/2025 |
1.0.27 | 217 | 5/14/2025 |
1.0.26 | 222 | 5/13/2025 |
1.0.24 | 76 | 5/3/2025 |
1.0.23 | 74 | 5/3/2025 |
1.0.22 | 91 | 5/2/2025 |
1.0.21 | 96 | 5/2/2025 |