Kafka.EventLoop 1.0.0-rc1

This is a prerelease version of Kafka.EventLoop.
dotnet add package Kafka.EventLoop --version 1.0.0-rc1
NuGet\Install-Package Kafka.EventLoop -Version 1.0.0-rc1
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Kafka.EventLoop" Version="1.0.0-rc1" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Kafka.EventLoop --version 1.0.0-rc1
#r "nuget: Kafka.EventLoop, 1.0.0-rc1"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Kafka.EventLoop as a Cake Addin
#addin nuget:?package=Kafka.EventLoop&version=1.0.0-rc1&prerelease

// Install Kafka.EventLoop as a Cake Tool
#tool nuget:?package=Kafka.EventLoop&version=1.0.0-rc1&prerelease

Kafka.EventLoop

Use this library in your .NET Worker Service to have continuous processing of Kafka messages.

The library implements the "Event Loop" design pattern:

  • consumes messages from Kafka,
  • accumulates them until a certain condition is met,
  • sends accumulated messages to your controller,
  • waits until your controller is done with message processing,
  • commits offsets to Kafka,
  • consumes the next messages from Kafka, and so on.

It is built on top of Confluent's .NET Client for Apache Kafka and implements all the necessary infrastructure for message consumption, error handling, throttling, etc. so that you can focus on message processing only.

Compatibility

Compatible with .NET 6 and higher.

Supports both Microsoft and Autofac IoC containers.

Features:

Below is a summary of features that the library provides.

For detailed explanation and code examples please follow the Wiki.

  • Kafka consuming - the library primarily functions as a Kafka consumer, utilizing Confluent's Kafka client to handle essential operations. These operations include subscribing to a Kafka topic, consuming messages, committing offsets, participating in a group re-balance, etc.

  • Parallel processing - you can specify the number of parallel consumers that will consume messages from your topic partitions and send them to your controller in parallel. This will allow you to increase the message throughput.

  • Intake strategies - you can use different strategies to decide when it is time to stop accumulating messages and send them to your controller for message processing, e.g. "fixed-size", "fixed-interval", etc.

  • Error handling - there are different ways to react to errors that may occur in your controller during message processing.

  • Dead-lettering - the library can send messages to a separate topic in case of non-transient errors. This allows you to avoid blocking the main topic from consuming new messages. You can also consume and process "dead" messages in the same Worker Service separately.

  • Throttling - you can limit the rate at which messages are being consumed and sent to your controller. This might be useful when you want to avoid putting your external components (database, API, etc.) at excessive load during message spikes or when you want to process Kafka messages from previous days, etc.

  • One-to-one streaming - the library allows you to send X messages to a separate topic "B" for every X consumed messages from the main topic "A" and preserve their original order. This is useful in scenarios when you have "raw" messages in the topic "A" which you want to enrich with additional data and send to the topic "B" for further processing.

Simplified code example:

  • appsettings.json:

    {
        "Kafka": {
            "ConnectionString": "xxx",
            "ConsumerGroups": [{
                "GroupId": "xxx",
                "TopicName": "xxx",
                "ParallelConsumers": 10,
                "Intake": {
                    "Strategy": {
                        "Name": "FixedInterval",
                        "IntervalInMs": 5000
                    }
                }
            }]
        }
    }
    
  • Program.cs:

    Host
        .CreateDefaultBuilder(args)
        .ConfigureServices((ctx, services) =>
        {
            services.AddKafkaEventLoop(ctx.Configuration, o => o
                .HasConsumerGroup("xxx", cgOptions => cgOptions
                    .HasMessageType<MyMessage>()
                    .HasJsonMessageDeserializer()
                    .HasController<MyController>()
                    .Build())
                .Build());
        })
        .Build()
        .Run();
    
  • MyController.cs:

    public class MyController : IKafkaController<MyMessage>
    {
        public Task ProcessAsync(MessageInfo<MyMessage>[] messages, CancellationToken token)
        {
            // process your messages
        }
    }
    

See more examples on Wiki

Contributing

You are welcome to contribute or create an issue.

Current contributors:

Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Kafka.EventLoop:

Package Downloads
Kafka.EventLoop.Autofac

Use this library in your .NET Worker Service with Autofac IoC container to have continuous processing of Kafka messages

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.0.0-rc1 73 5/29/2023