PiBox.Plugins.Messaging.Kafka.Flow
1.0.79
dotnet add package PiBox.Plugins.Messaging.Kafka.Flow --version 1.0.79
NuGet\Install-Package PiBox.Plugins.Messaging.Kafka.Flow -Version 1.0.79
<PackageReference Include="PiBox.Plugins.Messaging.Kafka.Flow" Version="1.0.79" />
paket add PiBox.Plugins.Messaging.Kafka.Flow --version 1.0.79
#r "nuget: PiBox.Plugins.Messaging.Kafka.Flow, 1.0.79"
// Install PiBox.Plugins.Messaging.Kafka.Flow as a Cake Addin #addin nuget:?package=PiBox.Plugins.Messaging.Kafka.Flow&version=1.0.79 // Install PiBox.Plugins.Messaging.Kafka.Flow as a Cake Tool #tool nuget:?package=PiBox.Plugins.Messaging.Kafka.Flow&version=1.0.79
Kafka Flow Plugin
This plugin provides the nuget packages from KafkaFlow as Pibox plugin.
The containers that you need are also provided in the docker-compose.yaml file. You just need to configure the consumer(
- and the producer(s) and use them accordingly.
Installation
Install the Plugin via Nuget
dotnet add package PiBox.Plugins.Messaging.Kafka.Flow
or add as package reference to your .csproj
<PackageReference Include="PiBox.Plugins.Messaging.Kafka.Flow" Version=""/>
Appsettings.yml
Configure your appsettings.yml
accordingly.
kafka:
client:
bootstrapServers: "localhost:9092,localhost:9093"
# securityProtocol: "SaslPlaintext"
# saslPassword: "asdf"
# saslUsername: "asdf"
# saslMechanism: "Plain"
# sslCaLocation: ""
# enableSslCertificateVerification: "false"
schemaRegistry:
url: "localhost:8081"
# basicAuthUserInfo: "developer:SECRET"
# enableSslCertificateVerification: "false"
Containers
The docker-compose.yaml will run the following containers:
- zookeeper
- broker
- schema-registry
- control-center
To run all containers
docker-compose up
To stop and remove the currently running containers
docker-compose down
Usage
Plugin configuration
public class KafkaFlowExamplePlugin : IPluginServiceConfiguration
{
private readonly IConfiguration _configuration;
private readonly ILogger? _logger;
public KafkaFlowExamplePlugin(IConfiguration configuration, ILogger<KafkaFlowExamplePlugin>? logger)
{
_configuration = configuration;
_logger = logger;
}
//Configure your consumers & producers
public void ConfigureServices(IServiceCollection serviceCollection)
{
serviceCollection.ConfigureKafka(_configuration, _logger, kafkaFlowBuilder => kafkaFlowBuilder
//possible configurations:
// producerConfig is none
// producer is added to a Dictionary<Type, Action<IProducerConfigurationBuilder>>
// type is typeof(TMessage)
.AddTypedProducer<TMessage>("protobuf-topic")
// producer is added to a Dictionary<Type, Action<IProducerConfigurationBuilder>>
// type is typeof(TMessage)
.AddTypedProducer<TMessage>("protobuf-topic", producerConfig)
// producerConfig is none
// producer is added to a List<(string, Action<IProducerConfigurationBuilder>)
// the string added is typeof(TProducer).Name
.AddProducer<TProducer>("protobuf-topic")
// producer is added to a List<(string, Action<IProducerConfigurationBuilder>)
// the string added is typeof(TProducer).Name
.AddProducer<TProducer>("protobuf-topic", producerConfig)
// consumer is added to a List<Action<IConsumerConfigurationBuilder>>
.AddConsumer<TMessageHandler>("protobuf-topic", "mygroup"));
// consumer is added to a List<Action<IConsumerConfigurationBuilder>>
// dead letter message is produced on dead letter topic in case of unsuccessful processing of the message
.AddConsumerWithDeadLetter<TMessageHandler, TMessage, TDeadLetterMessage>("protobuf-topic", "dead-letter-topic", "mygroup")
}
}
Click here to read more about the IServiceCollection interface.
Protobuf message format example
syntax = "proto3";
message TMessage {
string Message = 1;
int32 Code = 2;
}
Click here to read more about protobuf
Consumer usage
public class ProtobufMessageHandler : IMessageHandler<ProtobufLogMessage>
{
public Task Handle(IMessageContext context, ProtobufLogMessage message)
{
// Do something
}
}
Consumer with dead letter message producer usage
// The DltMessageHandler inherits from IMessageHandler<TMessage> (used to create a message handler)
// It also produces a dead letter message if there was an exception in ProcessMessageAsync
public class ProtobufDltMessageHandler : DltMessageHandler<TMessage, TDeadLetterMessage>
{
protected override async Task ProcessMessageAsync(IMessageContext context, TMessage message)
{
// Do something
}
protected override TDeadLetterMessage HandleError(IMessageContext context, TMessage message, Error error)
{
// Do something
}
}
Producer usage (optional)
public class SampleProducer
{
private readonly IMessageProducer _producer;
public SampleProducer(IProducerAccessor producerAccessor)
{
_producer = producerAccessor.GetProducer("TProducer");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_producer.ProduceAsync("protobuf-topic", messageKey, messageValue);
}
}
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net8.0
- KafkaFlow (>= 2.5.0)
- KafkaFlow.Extensions.Hosting (>= 2.5.0)
- KafkaFlow.LogHandler.Microsoft (>= 2.5.0)
- KafkaFlow.Microsoft.DependencyInjection (>= 2.5.0)
- KafkaFlow.SchemaRegistry (>= 2.5.0)
- KafkaFlow.Serializer (>= 2.5.0)
- KafkaFlow.Serializer.ProtobufNet (>= 2.5.0)
- KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf (>= 2.5.0)
- KafkaFlow.TypedHandler (>= 2.5.0)
- PiBox.Hosting.Abstractions (>= 1.0.79)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
1.0.79 | 81 | 12/20/2024 |
1.0.73 | 89 | 10/22/2024 |
1.0.66 | 79 | 10/15/2024 |
1.0.64 | 90 | 10/14/2024 |
1.0.61 | 86 | 10/1/2024 |
1.0.60 | 94 | 9/27/2024 |
1.0.54 | 139 | 4/30/2024 |
1.0.51 | 125 | 2/27/2024 |
1.0.49 | 94 | 2/27/2024 |
1.0.47 | 127 | 2/21/2024 |
1.0.45 | 118 | 2/20/2024 |
1.0.43 | 115 | 2/13/2024 |
1.0.41 | 119 | 2/13/2024 |
1.0.39 | 126 | 2/8/2024 |
1.0.38 | 122 | 2/8/2024 |
1.0.37 | 119 | 2/8/2024 |
1.0.35 | 115 | 2/2/2024 |
1.0.32 | 114 | 1/30/2024 |
1.0.25 | 195 | 12/27/2023 |
1.0.23 | 132 | 12/19/2023 |
1.0.22 | 126 | 12/19/2023 |
1.0.21 | 126 | 12/19/2023 |
1.0.19 | 148 | 12/11/2023 |
1.0.17 | 189 | 11/23/2023 |
1.0.7 | 147 | 11/23/2023 |
1.0.5 | 160 | 11/23/2023 |
1.0.3 | 154 | 11/23/2023 |
1.0.0 | 131 | 11/21/2023 |
0.9.9 | 153 | 11/21/2023 |
0.9.7 | 146 | 11/21/2023 |