MinimalKafka 0.10.2

dotnet add package MinimalKafka --version 0.10.2                
NuGet\Install-Package MinimalKafka -Version 0.10.2                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="MinimalKafka" Version="0.10.2" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add MinimalKafka --version 0.10.2                
#r "nuget: MinimalKafka, 0.10.2"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install MinimalKafka as a Cake Addin
#addin nuget:?package=MinimalKafka&version=0.10.2

// Install MinimalKafka as a Cake Tool
#tool nuget:?package=MinimalKafka&version=0.10.2                

MinimalKafka

Github Release GitHub Actions Workflow Status GitHub License Github Issues Open Github Pull Request Open Scheduled Code Security Testing

MinimalKafka is a Kafka consumer library designed to consume messages similarly to a Minimal API. This repository provides a streamlined and efficient way to work with Kafka consumers in .NET 8.0.

Alt text

Features

  • Simple and minimalistic Kafka consumer setup
  • Built to integrate seamlessly with .NET 8.0 applications
  • Utilizes the Confluent.Kafka for robust Kafka interactions

Installation

To install MinimalKafka, use the NuGet package manager:

dotnet add package MinimalKafka

Usage

Setting Up a Kafka Consumer

Below is a basic example of how to set up a Kafka consumer using MinimalKafka. For more detailed examples, refer to the example folder in the repository.

using MinimalKafka;

var builder = WebApplication.CreateBuilder(args);

.Services.AddMinimalKafka(config => 
{ 
    config.WithBootstrapServers("localhost:9092")
          .WithGroupId(Guid.NewGuid().ToString())
          .WithOffsetReset(AutoOffsetReset.Earliest); 
});

var app = builder.Build();

app.MapTopic("topic.name", (string key, string value) => {
    
    // Implement your code here
    return Task.CompletedTask;

}).WithGroupId("Topic 2");

await app.RunAsync();

Kafka Stream Processing

Often we want to join 2 topics and produce the outcome of this into a new topic

flowchart LR
    A[Topic A] -->|Consume| C
    B[Topic B] -->|Consume| C
    C{{Processor}} -->|Produce| D[Topic C]
Join Streams

This can be achieved with the following code


// Join 2 streams
app.MapStream<Guid, DatamodelA>("topic-a")
   .Join<Guid, DatamodelB>("topic-b").OnKey()
   .Into(async (c, k, v) =>
    {
        await c.ProduceAsync<Guid, DatamodelC>("topic-c", k, new(k, v.Item1.DataA, v.Item2.DataB));
    });
Store Projection

Each service who is interested in this can consume this model and store this for later use.

flowchart LR
    A[Topic C] -->|Consume| C
    C{{Processor}} --> D[(Database)]
// Project to local storage
app.MapStream<Guid, DatamodelC>("topic-c")
    .Into(async (c, _, v) =>
    {
        var store = c.RequestServices.GetRequiredService<IStore>();
        await store.SaveAsync(v);
    });

Stream Proccessing

Or some other complicated stuff and produce to other topic

flowchart LR

subgraph Process Logic
    Logic{DoStuff} <--> Database[(Database)]
    Logic <--> HttpClient    
end

subgraph Stream Process
    Consumer[Topic-C] -->|Consume| Processor
    Processor{{Processor}} --> Producer
    Producer[Topic]
end

Processor <--> Logic
app.MapStream<Guid, DatamodelC>("topic-c")
    .Into(async (c, _, v) =>
    {
        // Some Extreme Complicated task.
        var result = DoStuff();

        await c.ProduceAsync("other-topic", result.Id, result);
    });

Stream Branching

You could also branch a topic in different topics for different processes

flowchart LR
    A[Topic-A] -->|Consume| B{{Processor}}
    B -->|branch| C[/branch v1/]
    B -->|branch| D[/branch v2/]
    B -->|branch| E[/branch v3/]
    B -->|branch| F[/branch ??/]
    C -->|Produce| G[Topic-1]
    D -->|Produce| H[Topic-2]
    E -->|Produce| I[Topic-3]
    F -->|Produce| J[Unknown]

app.MapStream<Guid, DatamodelA>("topic-a")
   .SplitInto(x =>
   {
       x.Branch((_, v) => v.DataA == "v1", (c, k, v) => c.ProduceAsync("topic-1", k, v));
       x.Branch((_, v) => v.DataA == "v2", (c, k, v) => c.ProduceAsync("topic-2", k, v));
       x.Branch((_, v) => v.DataA == "v3", (c, k, v) => c.ProduceAsync("topic-3", k, v));
       x.DefaultBranch((c, k, v) => c.ProduceAsync("unknown", k, v));
   });

Streaming Command Proccessor

flowchart LR
A[Topic-A] -->|Consume| C{{Processor}}
B[Topic-B] -->|Produce| C
C -->|branch| D[/Create/] 
C -->|branch| E[/Addition/]
C -->|branch| F[/Subtract/]
D -->|Produce| G[Topic-B]
E -->|Produce| G[Topic-B]
F -->|Produce| G[Topic-B]
app.MapStream<Guid, DatamodelA>("topic-a")
   .Join<Guid, DatamodelB>("topic-b").OnKey()
   .SplitInto(x =>
   {
       // Initial Create Command
       x.Branch((_, v) => v.Item1?.DataA == "Create", async (c, k, v) => {
           if(v.Item2 != null) 
            {
               return;
           }
           await c.ProduceAsync("topic-b", k, new DatamodelB(k, 1));
       });

       // Add Command
       x.Branch((_, v) => v.Item1?.DataA == "Addition", async (c, k, v) =>
       {
           if(v.Item2 == null) 
           { 
               return;
           }

           await c.ProduceAsync("topic-b", k, v.Item2 with { DataB = v.Item2.DataB + 1 });
       });
       
       // Subtract Command
       x.Branch((_, v) => v.Item1?.DataA == "Subtract", async (c, k, v) =>
       {
           if (v.Item2 == null)
           {
               return;
           }

           await c.ProduceAsync("topic-b", k, v.Item2 with { DataB = v.Item2.DataB - 1 });
       });
   });

In this way you can chain topics together without interfering chancing

Contribution

Contributions are welcome! Please submit a pull request or open an issue to discuss your ideas or improvements.

License

This project is licensed under the MIT License.

Contact

For any questions or support, please open an issue in the repository.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
0.10.2 242 3/4/2025
0.10.2-alpha2 182 3/4/2025
0.10.2-alpha1 180 3/3/2025
0.10.1 81 3/2/2025
0.10.1-alpha3 103 3/2/2025
0.10.1-alpha2 80 3/2/2025
0.10.1-alpha1 74 3/1/2025
0.10.0 85 3/1/2025
0.10.0-alpha2 71 3/1/2025
0.9.0 335 1/6/2025
0.8.0 228 11/20/2024
0.7.1 111 11/14/2024
0.7.0 110 11/13/2024
0.6.0 164 10/31/2024
0.5.1 345 10/23/2024
0.5.0 96 10/22/2024
0.4.0 96 10/22/2024
0.3.1 131 10/12/2024
0.3.0 192 8/29/2024
0.2.2 134 8/22/2024
0.2.1 325 7/23/2024
0.2.0 100 7/23/2024
0.1.0 100 7/22/2024