DataFlow.Core 1.0.0

dotnet add package DataFlow.Core --version 1.0.0
                    
NuGet\Install-Package DataFlow.Core -Version 1.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="DataFlow.Core" Version="1.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="DataFlow.Core" Version="1.0.0" />
                    
Directory.Packages.props
<PackageReference Include="DataFlow.Core" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add DataFlow.Core --version 1.0.0
                    
#r "nuget: DataFlow.Core, 1.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package DataFlow.Core@1.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=DataFlow.Core&version=1.0.0
                    
Install as a Cake Addin
#tool nuget:?package=DataFlow.Core&version=1.0.0
                    
Install as a Cake Tool

DataFlow

High-performance ETL pipeline library for .NET that actually gets out of your way.

What is this?

DataFlow is a streaming data pipeline library designed for processing large datasets without blowing up your memory. Think LINQ but for ETL operations - read data from anywhere, transform it, and write it somewhere else. No XML configs, no enterprise architect nonsense. Just simple, chainable operations that work.

Installation

dotnet add package DataFlow.Core

Or grab it from NuGet if you're using Visual Studio.

Basic Usage

Process a CSV file in three lines:

DataFlow.From.Csv("input.csv")
    .Filter(row => row["Status"] == "Active")
    .WriteToCsv("output.csv");

That's the entire API philosophy. Read, transform, write.

Common Scenarios

Reading Data

// CSV files
var pipeline = DataFlow.From.Csv("data.csv");

// JSON files  
var pipeline = DataFlow.From.Json("data.json");

// Excel spreadsheets
var pipeline = DataFlow.From.Excel("report.xlsx", sheet: "Sales");

// SQL databases
var pipeline = DataFlow.From.Sql(connectionString)
    .Query("SELECT * FROM Orders WHERE Created > @date", new { date = DateTime.Today });

// In-memory collections
var pipeline = DataFlow.From.Collection(myList);

Transforming Data

Chain operations like you would with LINQ:

pipeline
    .Filter(row => row.GetValue<decimal>("Price") > 0)
    .Map(row => new {
        Product = row["Name"],
        Revenue = row.GetValue<decimal>("Price") * row.GetValue<int>("Quantity"),
        Category = row["Category"]
    })
    .GroupBy(x => x.Category)
    .Select(group => new {
        Category = group.Key,
        TotalRevenue = group.Sum(x => x.Revenue),
        ProductCount = group.Count()
    })
    .OrderByDescending(x => x.TotalRevenue)
    .Take(10);

Writing Results

// To CSV
pipeline.WriteToCsv("output.csv");

// To JSON
pipeline.WriteToJson("output.json");

// To Excel
pipeline.WriteToExcel("report.xlsx", "Results");

// To SQL
pipeline.WriteToSql(connectionString, "TargetTable");

// To collection
var results = pipeline.ToList();
var array = pipeline.ToArray();

Real World Examples

ETL: Database to CSV

Export active customers with their order totals:

DataFlow.From.Sql(connectionString)
    .Query(@"
        SELECT c.*, COUNT(o.Id) as OrderCount, SUM(o.Total) as TotalSpent 
        FROM Customers c
        LEFT JOIN Orders o ON c.Id = o.CustomerId
        WHERE c.IsActive = 1
        GROUP BY c.Id")
    .Map(row => new {
        CustomerId = row["Id"],
        Name = row["Name"],
        Email = row["Email"],
        OrderCount = row["OrderCount"],
        TotalSpent = row["TotalSpent"],
        CustomerValue = row.GetValue<int>("OrderCount") > 10 ? "High" : "Normal"
    })
    .OrderByDescending(x => x.TotalSpent)
    .WriteToCsv("customer_report.csv");

Data Cleaning

Clean messy CSV data:

DataFlow.From.Csv("raw_data.csv")
    .RemoveDuplicates("Id")
    .FillMissing("Email", "no-email@unknown.com")
    .FillMissing("Country", "USA")
    .Map(row => {
        row["Email"] = row["Email"].ToString().ToLower().Trim();
        row["Phone"] = Regex.Replace(row["Phone"].ToString(), @"[^\d]", "");
        return row;
    })
    .Filter(row => IsValidEmail(row["Email"].ToString()))
    .WriteToCsv("cleaned_data.csv");

Parallel Processing

Speed up CPU-intensive operations:

DataFlow.From.Csv("large_dataset.csv")
    .AsParallel(maxDegreeOfParallelism: Environment.ProcessorCount)
    .Map(row => {
        // Some expensive operation
        row["Hash"] = ComputeExpensiveHash(row["Data"]);
        row["Processed"] = true;
        return row;
    })
    .WriteToCsv("processed.csv");

Data Validation

Validate and handle errors:

var validator = new DataValidator()
    .Required("Id", "Name", "Email")
    .Email("Email")
    .Range("Age", min: 0, max: 150)
    .Regex("Phone", @"^\d{10}$")
    .Custom("StartDate", value => DateTime.Parse(value) <= DateTime.Now);

DataFlow.From.Csv("users.csv")
    .Validate(validator)
    .OnInvalid(ErrorStrategy.LogAndSkip)  // or ThrowException, Fix, Collect
    .WriteToCsv("valid_users.csv");

// Access validation errors
var errors = pipeline.ValidationErrors;

Streaming Large Files

Process multi-gigabyte files with constant memory usage:

DataFlow.From.Csv("10gb_log_file.csv")
    .Filter(row => row["Level"] == "ERROR")
    .Select(row => new {
        Timestamp = row["Timestamp"],
        Message = row["Message"],
        Source = row["Source"]
    })
    .WriteToCsv("errors_only.csv");  // Streams directly, uses ~50MB RAM

Advanced Features

Custom Data Sources

Implement IDataSource for custom sources:

public class MongoDataSource : IDataSource
{
    public IEnumerable<DataRow> Read()
    {
        // Your MongoDB reading logic
        foreach (var doc in collection.Find(filter))
        {
            yield return new DataRow(doc.ToDictionary());
        }
    }
}

// Use it
var pipeline = DataFlow.From.Custom(new MongoDataSource());

Custom Transformations

Create reusable transformations:

public static class MyTransformations
{
    public static IPipeline<T> NormalizePhoneNumbers<T>(this IPipeline<T> pipeline)
    {
        return pipeline.Map(row => {
            if (row.ContainsKey("Phone"))
            {
                row["Phone"] = NormalizePhone(row["Phone"].ToString());
            }
            return row;
        });
    }
}

// Use it
pipeline.NormalizePhoneNumbers().WriteToCsv("output.csv");

Progress Tracking

Monitor long-running operations:

var progress = new Progress<int>(percent => 
    Console.WriteLine($"Processing: {percent}%"));

DataFlow.From.Csv("large_file.csv")
    .WithProgress(progress)
    .Filter(row => ComplexFilter(row))
    .WriteToCsv("filtered.csv");

Performance

Benchmarks on 1M records (Intel i7, 16GB RAM):

Operation Memory Usage Time Records/sec
CSV Read + Filter + Write 42 MB 3.2s 312,500
JSON Parse + Transform 156 MB 5.1s 196,078
SQL Read + Group + Export 89 MB 4.7s 212,765
Parallel Transform (8 cores) 203 MB 1.4s 714,285

Memory usage stays constant regardless of file size when streaming.

Configuration

Global settings via DataFlowConfig:

DataFlowConfig.Configure(config => {
    config.DefaultCsvDelimiter = ';';
    config.DefaultDateFormat = "yyyy-MM-dd";
    config.BufferSize = 8192;
    config.EnableAutoTypeConversion = true;
    config.ThrowOnMissingColumns = false;
});

License

MIT - Do whatever you want with it.


Built because I got tired of writing the same ETL code over and over.

Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.0 175 8/28/2025