MinimalKafka 0.10.2
dotnet add package MinimalKafka --version 0.10.2
NuGet\Install-Package MinimalKafka -Version 0.10.2
<PackageReference Include="MinimalKafka" Version="0.10.2" />
paket add MinimalKafka --version 0.10.2
#r "nuget: MinimalKafka, 0.10.2"
// Install MinimalKafka as a Cake Addin #addin nuget:?package=MinimalKafka&version=0.10.2 // Install MinimalKafka as a Cake Tool #tool nuget:?package=MinimalKafka&version=0.10.2
MinimalKafka
MinimalKafka is a Kafka consumer library designed to consume messages similarly to a Minimal API. This repository provides a streamlined and efficient way to work with Kafka consumers in .NET 8.0.
Features
- Simple and minimalistic Kafka consumer setup
- Built to integrate seamlessly with .NET 8.0 applications
- Utilizes the Confluent.Kafka for robust Kafka interactions
Installation
To install MinimalKafka, use the NuGet package manager:
dotnet add package MinimalKafka
Usage
Setting Up a Kafka Consumer
Below is a basic example of how to set up a Kafka consumer using MinimalKafka. For more detailed examples, refer to the example
folder in the repository.
using MinimalKafka;
var builder = WebApplication.CreateBuilder(args);
.Services.AddMinimalKafka(config =>
{
config.WithBootstrapServers("localhost:9092")
.WithGroupId(Guid.NewGuid().ToString())
.WithOffsetReset(AutoOffsetReset.Earliest);
});
var app = builder.Build();
app.MapTopic("topic.name", (string key, string value) => {
// Implement your code here
return Task.CompletedTask;
}).WithGroupId("Topic 2");
await app.RunAsync();
Kafka Stream Processing
Often we want to join 2 topics and produce the outcome of this into a new topic
flowchart LR
A[Topic A] -->|Consume| C
B[Topic B] -->|Consume| C
C{{Processor}} -->|Produce| D[Topic C]
Join Streams
This can be achieved with the following code
// Join 2 streams
app.MapStream<Guid, DatamodelA>("topic-a")
.Join<Guid, DatamodelB>("topic-b").OnKey()
.Into(async (c, k, v) =>
{
await c.ProduceAsync<Guid, DatamodelC>("topic-c", k, new(k, v.Item1.DataA, v.Item2.DataB));
});
Store Projection
Each service who is interested in this can consume this model and store this for later use.
flowchart LR
A[Topic C] -->|Consume| C
C{{Processor}} --> D[(Database)]
// Project to local storage
app.MapStream<Guid, DatamodelC>("topic-c")
.Into(async (c, _, v) =>
{
var store = c.RequestServices.GetRequiredService<IStore>();
await store.SaveAsync(v);
});
Stream Proccessing
Or some other complicated stuff and produce to other topic
flowchart LR
subgraph Process Logic
Logic{DoStuff} <--> Database[(Database)]
Logic <--> HttpClient
end
subgraph Stream Process
Consumer[Topic-C] -->|Consume| Processor
Processor{{Processor}} --> Producer
Producer[Topic]
end
Processor <--> Logic
app.MapStream<Guid, DatamodelC>("topic-c")
.Into(async (c, _, v) =>
{
// Some Extreme Complicated task.
var result = DoStuff();
await c.ProduceAsync("other-topic", result.Id, result);
});
Stream Branching
You could also branch a topic in different topics for different processes
flowchart LR
A[Topic-A] -->|Consume| B{{Processor}}
B -->|branch| C[/branch v1/]
B -->|branch| D[/branch v2/]
B -->|branch| E[/branch v3/]
B -->|branch| F[/branch ??/]
C -->|Produce| G[Topic-1]
D -->|Produce| H[Topic-2]
E -->|Produce| I[Topic-3]
F -->|Produce| J[Unknown]
app.MapStream<Guid, DatamodelA>("topic-a")
.SplitInto(x =>
{
x.Branch((_, v) => v.DataA == "v1", (c, k, v) => c.ProduceAsync("topic-1", k, v));
x.Branch((_, v) => v.DataA == "v2", (c, k, v) => c.ProduceAsync("topic-2", k, v));
x.Branch((_, v) => v.DataA == "v3", (c, k, v) => c.ProduceAsync("topic-3", k, v));
x.DefaultBranch((c, k, v) => c.ProduceAsync("unknown", k, v));
});
Streaming Command Proccessor
flowchart LR
A[Topic-A] -->|Consume| C{{Processor}}
B[Topic-B] -->|Produce| C
C -->|branch| D[/Create/]
C -->|branch| E[/Addition/]
C -->|branch| F[/Subtract/]
D -->|Produce| G[Topic-B]
E -->|Produce| G[Topic-B]
F -->|Produce| G[Topic-B]
app.MapStream<Guid, DatamodelA>("topic-a")
.Join<Guid, DatamodelB>("topic-b").OnKey()
.SplitInto(x =>
{
// Initial Create Command
x.Branch((_, v) => v.Item1?.DataA == "Create", async (c, k, v) => {
if(v.Item2 != null)
{
return;
}
await c.ProduceAsync("topic-b", k, new DatamodelB(k, 1));
});
// Add Command
x.Branch((_, v) => v.Item1?.DataA == "Addition", async (c, k, v) =>
{
if(v.Item2 == null)
{
return;
}
await c.ProduceAsync("topic-b", k, v.Item2 with { DataB = v.Item2.DataB + 1 });
});
// Subtract Command
x.Branch((_, v) => v.Item1?.DataA == "Subtract", async (c, k, v) =>
{
if (v.Item2 == null)
{
return;
}
await c.ProduceAsync("topic-b", k, v.Item2 with { DataB = v.Item2.DataB - 1 });
});
});
In this way you can chain topics together without interfering chancing
Contribution
Contributions are welcome! Please submit a pull request or open an issue to discuss your ideas or improvements.
License
This project is licensed under the MIT License.
Contact
For any questions or support, please open an issue in the repository.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. |
-
net8.0
- Confluent.Kafka (>= 2.8.0)
- Microsoft.AspNetCore.Http.Abstractions (>= 2.3.0)
- Microsoft.Extensions.Hosting (>= 8.0.1)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.2)
- System.Linq.Async (>= 6.0.1)
- System.Threading.Tasks.Dataflow (>= 8.0.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
0.10.2 | 242 | 3/4/2025 |
0.10.2-alpha2 | 182 | 3/4/2025 |
0.10.2-alpha1 | 180 | 3/3/2025 |
0.10.1 | 81 | 3/2/2025 |
0.10.1-alpha3 | 103 | 3/2/2025 |
0.10.1-alpha2 | 80 | 3/2/2025 |
0.10.1-alpha1 | 74 | 3/1/2025 |
0.10.0 | 85 | 3/1/2025 |
0.10.0-alpha2 | 71 | 3/1/2025 |
0.9.0 | 335 | 1/6/2025 |
0.8.0 | 228 | 11/20/2024 |
0.7.1 | 111 | 11/14/2024 |
0.7.0 | 110 | 11/13/2024 |
0.6.0 | 164 | 10/31/2024 |
0.5.1 | 345 | 10/23/2024 |
0.5.0 | 96 | 10/22/2024 |
0.4.0 | 96 | 10/22/2024 |
0.3.1 | 131 | 10/12/2024 |
0.3.0 | 192 | 8/29/2024 |
0.2.2 | 134 | 8/22/2024 |
0.2.1 | 325 | 7/23/2024 |
0.2.0 | 100 | 7/23/2024 |
0.1.0 | 100 | 7/22/2024 |