KafKaStorm 8.1.0
dotnet add package KafKaStorm --version 8.1.0
NuGet\Install-Package KafKaStorm -Version 8.1.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="KafKaStorm" Version="8.1.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add KafKaStorm --version 8.1.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: KafKaStorm, 8.1.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install KafKaStorm as a Cake Addin #addin nuget:?package=KafKaStorm&version=8.1.0 // Install KafKaStorm as a Cake Tool #tool nuget:?package=KafKaStorm&version=8.1.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
KafkaStorm
Simple .net client for Kafka based on Confluent.Kafka
Features
- Create queue for messages that couldn't be send
- Concurrent consumers
- Producing messages concurrently
Installation
Using package manager:
Install-Package KafkaStorm -Version 8.0.2
Usage/Examples
Setup
using Confluent.Kafka;
using KafkaStorm.Extensions;
using KafkaStorm.Interfaces;
builder.Services.AddKafkaStorm(factory =>
{
factory.AddProducer(prf =>
{
prf.ConfigProducer(new ProducerConfig
{
BootstrapServers = host
});
prf.InMemoryQueue();
prf.SetQueueLimit(65536);
});
// Use this line for starting producer queue:
factory.StartProducerHostedService();
factory.AddConsumers(crf =>
{
crf.AddConsumer<HelloConsumer, HelloEvent>(new ConsumerConfig
{
BootstrapServers = "localhost:29092",
GroupId = "TestGroup"
}, "topicName");
});
});
It's the same ConsumerConfig as Confluent.Kafka
New Feature 🎉
Adding consumers is even easier now:
using Confluent.Kafka;
using KafkaStorm.Extensions;
using KafkaStorm.Interfaces;
builder.Services.AddKafkaStorm(factory =>
{
factory.AddConsumers(crf =>
{
var config = new ConsumerConfig { BootstrapServers = "localhost:29092", GroupId = "TestGroup" };
//This line can add all consumers in the assembly with their according messages automatically
crf.AddConsumersFromAssembly(Assembly.GetExecutingAssembly(), config);
});
});
Consuming
using KafkaStorm.Interfaces;
using Microsoft.Extensions.Logging;
public class HelloConsumer : IConsumer<HelloEvent>
{
private readonly ILogger<HelloConsumer> _logger;
public HelloConsumer(ILogger<HelloConsumer> logger)
{
_logger = logger;
}
public async Task Handle(HelloEvent @event, CancellationToken cancellationToken)
{
_logger.LogDebug("Message Received");
}}
Event
Your event (message) can be any class like this:
public class HelloEvent
{
public HelloEvent(DateTime time)
{
Message = "Hello";
Time = time;
}
public string Message { get; }
public DateTime Time { get; }
}
Attention: if your class contains a property with Interface type it may cause exception while deserializing JSON
Producing
Just use IProducer like a service (initialize it with constructor):
using KafkaStorm.Interfaces;
private readonly IProducer _producer;
public TestService(IProducer producer)
{
_producer = producer;
}
Produce with queue
_producer.Produce(new HelloEvent(DateTime.Now), "topicName");
Produce right now
await _producer.ProduceNowAsync(new HelloEvent(DateTime.Now), "topicName");
Author
Related
Here are some related projects
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net8.0
- Confluent.Kafka (>= 2.3.0)
- Microsoft.Extensions.DependencyInjection (>= 8.0.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 8.0.0)
- System.Text.Json (>= 8.0.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated | |
---|---|---|---|
8.1.0 | 151 | 4/20/2024 | |
8.0.6 | 130 | 4/17/2024 | |
8.0.5 | 134 | 4/17/2024 | |
8.0.4 | 113 | 4/17/2024 | |
8.0.3 | 113 | 4/17/2024 | |
8.0.2 | 198 | 2/28/2024 | |
8.0.1 | 166 | 2/28/2024 | |
8.0.0 | 160 | 2/28/2024 | |
1.9.0 | 456 | 5/6/2023 | |
1.8.1 | 475 | 5/6/2023 | |
1.8.0 | 598 | 12/22/2022 | |
1.7.0 | 815 | 1/19/2022 | |
1.5.0 | 730 | 1/19/2022 | |
1.4.0 | 729 | 1/19/2022 | |
1.3.0 | 743 | 1/18/2022 | |
1.2.3 | 751 | 1/18/2022 | |
1.2.2 | 738 | 1/18/2022 | |
1.2.1 | 720 | 1/18/2022 | |
1.2.0 | 705 | 1/17/2022 | |
1.1.0 | 751 | 1/17/2022 | |
1.0.0 | 758 | 1/2/2022 |