DataFlow.Core
1.0.0
dotnet add package DataFlow.Core --version 1.0.0
NuGet\Install-Package DataFlow.Core -Version 1.0.0
<PackageReference Include="DataFlow.Core" Version="1.0.0" />
<PackageVersion Include="DataFlow.Core" Version="1.0.0" />
<PackageReference Include="DataFlow.Core" />
paket add DataFlow.Core --version 1.0.0
#r "nuget: DataFlow.Core, 1.0.0"
#:package DataFlow.Core@1.0.0
#addin nuget:?package=DataFlow.Core&version=1.0.0
#tool nuget:?package=DataFlow.Core&version=1.0.0
DataFlow
High-performance ETL pipeline library for .NET that actually gets out of your way.
What is this?
DataFlow is a streaming data pipeline library designed for processing large datasets without blowing up your memory. Think LINQ but for ETL operations - read data from anywhere, transform it, and write it somewhere else. No XML configs, no enterprise architect nonsense. Just simple, chainable operations that work.
Installation
dotnet add package DataFlow.Core
Or grab it from NuGet if you're using Visual Studio.
Basic Usage
Process a CSV file in three lines:
DataFlow.From.Csv("input.csv")
.Filter(row => row["Status"] == "Active")
.WriteToCsv("output.csv");
That's the entire API philosophy. Read, transform, write.
Common Scenarios
Reading Data
// CSV files
var pipeline = DataFlow.From.Csv("data.csv");
// JSON files
var pipeline = DataFlow.From.Json("data.json");
// Excel spreadsheets
var pipeline = DataFlow.From.Excel("report.xlsx", sheet: "Sales");
// SQL databases
var pipeline = DataFlow.From.Sql(connectionString)
.Query("SELECT * FROM Orders WHERE Created > @date", new { date = DateTime.Today });
// In-memory collections
var pipeline = DataFlow.From.Collection(myList);
Transforming Data
Chain operations like you would with LINQ:
pipeline
.Filter(row => row.GetValue<decimal>("Price") > 0)
.Map(row => new {
Product = row["Name"],
Revenue = row.GetValue<decimal>("Price") * row.GetValue<int>("Quantity"),
Category = row["Category"]
})
.GroupBy(x => x.Category)
.Select(group => new {
Category = group.Key,
TotalRevenue = group.Sum(x => x.Revenue),
ProductCount = group.Count()
})
.OrderByDescending(x => x.TotalRevenue)
.Take(10);
Writing Results
// To CSV
pipeline.WriteToCsv("output.csv");
// To JSON
pipeline.WriteToJson("output.json");
// To Excel
pipeline.WriteToExcel("report.xlsx", "Results");
// To SQL
pipeline.WriteToSql(connectionString, "TargetTable");
// To collection
var results = pipeline.ToList();
var array = pipeline.ToArray();
Real World Examples
ETL: Database to CSV
Export active customers with their order totals:
DataFlow.From.Sql(connectionString)
.Query(@"
SELECT c.*, COUNT(o.Id) as OrderCount, SUM(o.Total) as TotalSpent
FROM Customers c
LEFT JOIN Orders o ON c.Id = o.CustomerId
WHERE c.IsActive = 1
GROUP BY c.Id")
.Map(row => new {
CustomerId = row["Id"],
Name = row["Name"],
Email = row["Email"],
OrderCount = row["OrderCount"],
TotalSpent = row["TotalSpent"],
CustomerValue = row.GetValue<int>("OrderCount") > 10 ? "High" : "Normal"
})
.OrderByDescending(x => x.TotalSpent)
.WriteToCsv("customer_report.csv");
Data Cleaning
Clean messy CSV data:
DataFlow.From.Csv("raw_data.csv")
.RemoveDuplicates("Id")
.FillMissing("Email", "no-email@unknown.com")
.FillMissing("Country", "USA")
.Map(row => {
row["Email"] = row["Email"].ToString().ToLower().Trim();
row["Phone"] = Regex.Replace(row["Phone"].ToString(), @"[^\d]", "");
return row;
})
.Filter(row => IsValidEmail(row["Email"].ToString()))
.WriteToCsv("cleaned_data.csv");
Parallel Processing
Speed up CPU-intensive operations:
DataFlow.From.Csv("large_dataset.csv")
.AsParallel(maxDegreeOfParallelism: Environment.ProcessorCount)
.Map(row => {
// Some expensive operation
row["Hash"] = ComputeExpensiveHash(row["Data"]);
row["Processed"] = true;
return row;
})
.WriteToCsv("processed.csv");
Data Validation
Validate and handle errors:
var validator = new DataValidator()
.Required("Id", "Name", "Email")
.Email("Email")
.Range("Age", min: 0, max: 150)
.Regex("Phone", @"^\d{10}$")
.Custom("StartDate", value => DateTime.Parse(value) <= DateTime.Now);
DataFlow.From.Csv("users.csv")
.Validate(validator)
.OnInvalid(ErrorStrategy.LogAndSkip) // or ThrowException, Fix, Collect
.WriteToCsv("valid_users.csv");
// Access validation errors
var errors = pipeline.ValidationErrors;
Streaming Large Files
Process multi-gigabyte files with constant memory usage:
DataFlow.From.Csv("10gb_log_file.csv")
.Filter(row => row["Level"] == "ERROR")
.Select(row => new {
Timestamp = row["Timestamp"],
Message = row["Message"],
Source = row["Source"]
})
.WriteToCsv("errors_only.csv"); // Streams directly, uses ~50MB RAM
Advanced Features
Custom Data Sources
Implement IDataSource
for custom sources:
public class MongoDataSource : IDataSource
{
public IEnumerable<DataRow> Read()
{
// Your MongoDB reading logic
foreach (var doc in collection.Find(filter))
{
yield return new DataRow(doc.ToDictionary());
}
}
}
// Use it
var pipeline = DataFlow.From.Custom(new MongoDataSource());
Custom Transformations
Create reusable transformations:
public static class MyTransformations
{
public static IPipeline<T> NormalizePhoneNumbers<T>(this IPipeline<T> pipeline)
{
return pipeline.Map(row => {
if (row.ContainsKey("Phone"))
{
row["Phone"] = NormalizePhone(row["Phone"].ToString());
}
return row;
});
}
}
// Use it
pipeline.NormalizePhoneNumbers().WriteToCsv("output.csv");
Progress Tracking
Monitor long-running operations:
var progress = new Progress<int>(percent =>
Console.WriteLine($"Processing: {percent}%"));
DataFlow.From.Csv("large_file.csv")
.WithProgress(progress)
.Filter(row => ComplexFilter(row))
.WriteToCsv("filtered.csv");
Performance
Benchmarks on 1M records (Intel i7, 16GB RAM):
Operation | Memory Usage | Time | Records/sec |
---|---|---|---|
CSV Read + Filter + Write | 42 MB | 3.2s | 312,500 |
JSON Parse + Transform | 156 MB | 5.1s | 196,078 |
SQL Read + Group + Export | 89 MB | 4.7s | 212,765 |
Parallel Transform (8 cores) | 203 MB | 1.4s | 714,285 |
Memory usage stays constant regardless of file size when streaming.
Configuration
Global settings via DataFlowConfig
:
DataFlowConfig.Configure(config => {
config.DefaultCsvDelimiter = ';';
config.DefaultDateFormat = "yyyy-MM-dd";
config.BufferSize = 8192;
config.EnableAutoTypeConversion = true;
config.ThrowOnMissingColumns = false;
});
License
MIT - Do whatever you want with it.
Built because I got tired of writing the same ETL code over and over.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net9.0
- ClosedXML (>= 0.105.0)
- Microsoft.Data.SqlClient (>= 6.1.1)
- System.Data.Common (>= 4.3.0)
- System.Text.Json (>= 9.0.8)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last Updated |
---|---|---|
1.0.0 | 175 | 8/28/2025 |