DotnetKit.DataflowBuilder
1.0.0-preview.0.8
See the version list below for details.
dotnet add package DotnetKit.DataflowBuilder --version 1.0.0-preview.0.8
NuGet\Install-Package DotnetKit.DataflowBuilder -Version 1.0.0-preview.0.8
<PackageReference Include="DotnetKit.DataflowBuilder" Version="1.0.0-preview.0.8" />
<PackageVersion Include="DotnetKit.DataflowBuilder" Version="1.0.0-preview.0.8" />
<PackageReference Include="DotnetKit.DataflowBuilder" />
paket add DotnetKit.DataflowBuilder --version 1.0.0-preview.0.8
#r "nuget: DotnetKit.DataflowBuilder, 1.0.0-preview.0.8"
#:package DotnetKit.DataflowBuilder@1.0.0-preview.0.8
#addin nuget:?package=DotnetKit.DataflowBuilder&version=1.0.0-preview.0.8&prerelease
#tool nuget:?package=DotnetKit.DataflowBuilder&version=1.0.0-preview.0.8&prerelease
DataflowBuilder
Overview
DataflowBuilder
simplifies the creation of dataflow pipelines, making it easier to build, manage, and test complex data processing workflows. With its fluent API, you can quickly set up pipelines that handle various data processing tasks efficiently.
One of the key features of DataflowBuilder is its support for parallel asynchronous tasks executions.
By leveraging dataflow blocks, you can process data concurrently, making efficient use of system resources and improving the performance of your applications. This is especially useful for CPU-intensive or I/O-bound operations where tasks can benefit from parallelism.
Getting Started
Installation
To use DataflowBuilder
in your project, install it via NuGet Package Manager
DotnetKit.DataflowBuilder NUGET
Install-Package DotnetKit.DataflowBuilder
Or using the .NET CLI:
dotnet add package DotnetKit.DataflowBuilder
How it works
- Define typed item source and build another blocks with fluent API builder
- Build the pipeline
- Send item
blocks could transform, enrich, group each data (item) sent to the pipeline
Sequential pipeline flow
flowchart LR
Source --> Process1[Enrich] --> Process2[Transform] --> Target
Pipeline flow with parallelization
flowchart LR
Source --> Process1[Enrich] --> Process2[Parallel processing] --> Store1
Process2[Parallel processing] --> Store2
Process2[Parallel processing] --> Store3
Store1 --> Target
Store2 --> Target
Store3 --> Target
Basic Usage
Here is a simple example to get you started with DataflowBuilder
.
Create a Pipeline:
using DataflowBuilder.Core.Pipeline;
using System.Threading.Tasks;
public class Example
{
public async Task RunPipeline()
{
var pipeline = DataFlowPipelineBuilder.FromSource<int>()
.Process(a => a * 2)
.ToTarget(a =>
{
Console.WriteLine(a);
})
.Build();
await pipeline.SendAsync(10);
await pipeline.CompleteAsync();
}
}
Run the Pipeline:
public static async Task Main(string[] args)
{
var example = new Example();
await example.RunPipeline();
}
Advanced Usage
DataflowBuilder
supports advanced operations like batching and grouping.
Batching
var pipeline = DataFlowPipelineBuilder.FromSource<int>()
.Batch(5)
.ToTarget(batch =>
{
Console.WriteLine($"Batch received: {string.Join(",", batch)}");
})
.Build();
for (int i = 1; i <= 10; i++)
{
await pipeline.SendAsync(i);
}
await pipeline.CompleteAsync();
Grouping
var pipeline = DataFlowPipelineBuilder.FromSource<char[]>()
.ProcessMany(chars => chars.GroupBy(c => c).Select(g => g.Key))
.ToTarget(c =>
{
Console.Write(c);
})
.Build();
await pipeline.SendAsync("hello world".ToCharArray());
await pipeline.CompleteAsync();
Real world example with parallelization
flowchart TD
A[Sensor item] -->|Process| B(Enriched sensor)
B --> |batch| C(process each 1000 items with 3 // tasks )
C -->|processing task1| D[Bulk insert 1000 sensors]
C -->|processing task2| E[Bulk insert 1000 sensors]
C -->|processing task3| F[Bulk insert 1000 sensors]
D --> |task1 processed| G[Target log event]
E --> |task2 processed| G[Target ]
F --> |task3 processed| G[Publish notification]
//Configuration stage (DI, startup process, or specific lifetime )
//Building sensor bulk insertion pipeline
var pipeline = DataFlowPipelineBuilder.FromSource<SensorEntity>()
.Process(sensor => sensor.EnrichAsync())
.Batch(1000)
.ProcessAsync(async enrichedSensors => {
await mongoDBClient.BulkInsertAsync(enrichedSensors)
}, maxDegreeOfParallelism: 3)
.ToTargetAsync(enrichedSensors =>
{
await servicebusDomainTopic.PublishCreatedSensorsNotificationAsync();
})
.Build();
//Execution stage (API or Event trigger )
//it could be billion of sensors
await foreach(var sensorPage in GetAllSensorsAsync())
{
await _pipeline.SendAsync(sensor);
}
await _pipeline.CompleteAsync();
Contributing
Contributions are welcome! Feel free to open issues or submit pull requests.
License
This project is licensed under the MIT License.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 is compatible. net5.0-windows was computed. net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
.NET Core | netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.1 is compatible. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.1
- System.Threading.Tasks.Dataflow (>= 8.0.1)
-
net5.0
- No dependencies.
-
net6.0
- No dependencies.
-
net8.0
- No dependencies.
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last Updated |
---|---|---|
1.1.1-preview.0.5 | 82 | 2/27/2025 |
1.1.1-preview.0.2 | 71 | 12/11/2024 |
1.1.1-preview.0.1 | 64 | 12/6/2024 |
1.1.0 | 110 | 12/5/2024 |
1.0.0-preview.0.10 | 76 | 11/8/2024 |
1.0.0-preview.0.9 | 72 | 11/8/2024 |
1.0.0-preview.0.8 | 69 | 11/8/2024 |
1.0.0-preview.0.7 | 63 | 11/8/2024 |
1.0.0-preview.0.6 | 63 | 11/8/2024 |
1.0.0-preview.0.5 | 58 | 11/7/2024 |
1.0.0-preview.0.4 | 63 | 12/5/2024 |
1.0.0-preview.0.2 | 66 | 11/28/2024 |
1.0.0-preview.0 | 66 | 11/15/2024 |