DotnetKit.DataflowBuilder 1.0.0-preview.0.8

This is a prerelease version of DotnetKit.DataflowBuilder.
There is a newer version of this package available.
See the version list below for details.
dotnet add package DotnetKit.DataflowBuilder --version 1.0.0-preview.0.8
                    
NuGet\Install-Package DotnetKit.DataflowBuilder -Version 1.0.0-preview.0.8
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="DotnetKit.DataflowBuilder" Version="1.0.0-preview.0.8" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="DotnetKit.DataflowBuilder" Version="1.0.0-preview.0.8" />
                    
Directory.Packages.props
<PackageReference Include="DotnetKit.DataflowBuilder" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add DotnetKit.DataflowBuilder --version 1.0.0-preview.0.8
                    
#r "nuget: DotnetKit.DataflowBuilder, 1.0.0-preview.0.8"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package DotnetKit.DataflowBuilder@1.0.0-preview.0.8
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=DotnetKit.DataflowBuilder&version=1.0.0-preview.0.8&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=DotnetKit.DataflowBuilder&version=1.0.0-preview.0.8&prerelease
                    
Install as a Cake Tool

DataflowBuilder

internal public Dotnetkit.DataflowBuilder

Overview

DataflowBuilder simplifies the creation of dataflow pipelines, making it easier to build, manage, and test complex data processing workflows. With its fluent API, you can quickly set up pipelines that handle various data processing tasks efficiently.

One of the key features of DataflowBuilder is its support for parallel asynchronous tasks executions.

By leveraging dataflow blocks, you can process data concurrently, making efficient use of system resources and improving the performance of your applications. This is especially useful for CPU-intensive or I/O-bound operations where tasks can benefit from parallelism.

Getting Started

Installation

To use DataflowBuilder in your project, install it via NuGet Package Manager DotnetKit.DataflowBuilder NUGET

Install-Package DotnetKit.DataflowBuilder

Or using the .NET CLI:

dotnet add package DotnetKit.DataflowBuilder

How it works

  • Define typed item source and build another blocks with fluent API builder
  • Build the pipeline
  • Send item

blocks could transform, enrich, group each data (item) sent to the pipeline

Sequential pipeline flow
flowchart LR
    Source --> Process1[Enrich] --> Process2[Transform] --> Target
Pipeline flow with parallelization
flowchart LR
    Source --> Process1[Enrich] --> Process2[Parallel processing] --> Store1
    Process2[Parallel processing] --> Store2
    Process2[Parallel processing] --> Store3
    Store1 --> Target
    Store2 --> Target
    Store3 --> Target

Basic Usage

Here is a simple example to get you started with DataflowBuilder.

Create a Pipeline:

using DataflowBuilder.Core.Pipeline;
using System.Threading.Tasks;

public class Example
{
    public async Task RunPipeline()
    {
        var pipeline = DataFlowPipelineBuilder.FromSource<int>()
            .Process(a => a * 2)
            .ToTarget(a =>
            {
                Console.WriteLine(a);
            })
            .Build();

        await pipeline.SendAsync(10);
        await pipeline.CompleteAsync();
    }
}

Run the Pipeline:

public static async Task Main(string[] args)
{
    var example = new Example();
    await example.RunPipeline();
}

Advanced Usage

DataflowBuilder supports advanced operations like batching and grouping.

Batching
var pipeline = DataFlowPipelineBuilder.FromSource<int>()
    .Batch(5)
    .ToTarget(batch =>
    {
        Console.WriteLine($"Batch received: {string.Join(",", batch)}");
    })
    .Build();

for (int i = 1; i <= 10; i++)
{
    await pipeline.SendAsync(i);
}

await pipeline.CompleteAsync();
Grouping
var pipeline = DataFlowPipelineBuilder.FromSource<char[]>()
    .ProcessMany(chars => chars.GroupBy(c => c).Select(g => g.Key))
    .ToTarget(c =>
    {
        Console.Write(c);
    })
    .Build();

await pipeline.SendAsync("hello world".ToCharArray());
await pipeline.CompleteAsync();
Real world example with parallelization
flowchart TD
    A[Sensor item] -->|Process| B(Enriched sensor)
    B --> |batch| C(process each 1000 items with 3 // tasks )
    C -->|processing task1| D[Bulk insert 1000 sensors]
    C -->|processing task2| E[Bulk insert 1000 sensors]
    C -->|processing task3| F[Bulk insert 1000 sensors]
    D --> |task1 processed| G[Target log event]
    E --> |task2 processed| G[Target ]
    F --> |task3 processed| G[Publish notification]

//Configuration stage (DI, startup process, or specific lifetime )
//Building sensor bulk insertion pipeline
var pipeline = DataFlowPipelineBuilder.FromSource<SensorEntity>()
    .Process(sensor => sensor.EnrichAsync())
    .Batch(1000)
    .ProcessAsync(async enrichedSensors => {
       await mongoDBClient.BulkInsertAsync(enrichedSensors)
    }, maxDegreeOfParallelism: 3)
    .ToTargetAsync(enrichedSensors =>
    {
     await servicebusDomainTopic.PublishCreatedSensorsNotificationAsync();
    })
    .Build();

//Execution stage (API or Event trigger )
//it could be billion of sensors
await foreach(var sensorPage in GetAllSensorsAsync())
    {
        await _pipeline.SendAsync(sensor);
    }
await _pipeline.CompleteAsync();

Contributing

Contributions are welcome! Feel free to open issues or submit pull requests.

License

This project is licensed under the MIT License.

Product Compatible and additional computed target framework versions.
.NET net5.0 is compatible.  net5.0-windows was computed.  net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.1 is compatible. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.1.1-preview.0.5 82 2/27/2025
1.1.1-preview.0.2 71 12/11/2024
1.1.1-preview.0.1 64 12/6/2024
1.1.0 110 12/5/2024
1.0.0-preview.0.10 76 11/8/2024
1.0.0-preview.0.9 72 11/8/2024
1.0.0-preview.0.8 69 11/8/2024
1.0.0-preview.0.7 63 11/8/2024
1.0.0-preview.0.6 63 11/8/2024
1.0.0-preview.0.5 58 11/7/2024
1.0.0-preview.0.4 63 12/5/2024
1.0.0-preview.0.2 66 11/28/2024
1.0.0-preview.0 66 11/15/2024