CSharpDB.Pipelines 3.8.0

Prefix Reserved
dotnet add package CSharpDB.Pipelines --version 3.8.0
                    
NuGet\Install-Package CSharpDB.Pipelines -Version 3.8.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="CSharpDB.Pipelines" Version="3.8.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="CSharpDB.Pipelines" Version="3.8.0" />
                    
Directory.Packages.props
<PackageReference Include="CSharpDB.Pipelines" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add CSharpDB.Pipelines --version 3.8.0
                    
#r "nuget: CSharpDB.Pipelines, 3.8.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package CSharpDB.Pipelines@3.8.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=CSharpDB.Pipelines&version=3.8.0
                    
Install as a Cake Addin
#tool nuget:?package=CSharpDB.Pipelines&version=3.8.0
                    
Install as a Cake Tool

CSharpDB.Pipelines

Package contracts and runtime foundation for CSharpDB ETL pipelines.

NuGet .NET 10 Release License: MIT

Overview

CSharpDB.Pipelines defines portable pipeline packages and a small orchestration runtime for batch ETL work. A pipeline package describes the source, transformations, destination, execution options, and optional incremental state. The built-in runtime can validate packages, serialize them to JSON, execute them in batches, capture checkpoints, and report rejects and run metrics. Packages can also name trusted host commands for lifecycle hooks; command bodies are registered by the process that runs the pipeline and are not serialized into the package. Package JSON includes generated automation metadata that lists the trusted commands and scalar functions a host must register.

Current boundary:

  • Built-in runtime components currently support CSV and JSON file sources/destinations
  • Built-in transforms support Select, Rename, Cast, Filter, Derive, and Deduplicate
  • CSharpDB table sources/destinations and SQL query sources are modeled in the contracts but are not implemented by DefaultPipelineComponentFactory yet

Features

  • Pipeline package model: strongly typed source, transform, destination, and execution settings
  • Validation: schema-level validation before execution
  • Serialization: save/load pipeline packages as JSON
  • Runtime orchestration: Validate, DryRun, Run, and Resume modes
  • Built-in connectors: CSV and JSON file readers/writers
  • Built-in transforms: select, rename, cast, filter, derive, deduplicate
  • Checkpointing hooks: pluggable checkpoint store and run logger abstractions
  • Trusted command hooks: host-registered commands for run started, batch completed, run succeeded, and run failed events
  • Automation metadata: generated import/export manifest for trusted command and scalar function names
  • Batch metrics: rows read/written/rejected plus batch counts

Usage

End-to-End Example

using CSharpDB.Pipelines.Models;
using CSharpDB.Pipelines.Runtime;
using CSharpDB.Pipelines.Runtime.BuiltIns;
using CSharpDB.Pipelines.Serialization;
using CSharpDB.Pipelines.Validation;
using CSharpDB.Primitives;

Directory.CreateDirectory("data");

await File.WriteAllLinesAsync("data/customers.csv",
[
    "id,name,status",
    "1,Alice,active",
    "1,Alice,active",
    "2,Bob,inactive",
    "3,Carol,active",
]);

var package = new PipelinePackageDefinition
{
    Name = "customers-csv-to-json",
    Version = "1.0.0",
    Description = "Import customers from CSV, clean them, and emit JSON.",
    Source = new PipelineSourceDefinition
    {
        Kind = PipelineSourceKind.CsvFile,
        Path = "data/customers.csv",
        HasHeaderRow = true,
    },
    Transforms =
    [
        new PipelineTransformDefinition
        {
            Kind = PipelineTransformKind.Select,
            SelectColumns = ["id", "name", "status"],
        },
        new PipelineTransformDefinition
        {
            Kind = PipelineTransformKind.Rename,
            RenameMappings =
            [
                new PipelineRenameMapping { Source = "name", Target = "full_name" },
            ],
        },
        new PipelineTransformDefinition
        {
            Kind = PipelineTransformKind.Cast,
            CastMappings =
            [
                new PipelineCastMapping { Column = "id", TargetType = DbType.Integer },
            ],
        },
        new PipelineTransformDefinition
        {
            Kind = PipelineTransformKind.Filter,
            FilterExpression = "status == 'active'",
        },
        new PipelineTransformDefinition
        {
            Kind = PipelineTransformKind.Derive,
            DerivedColumns =
            [
                new PipelineDerivedColumn { Name = "import_source", Expression = "'csv'" },
            ],
        },
        new PipelineTransformDefinition
        {
            Kind = PipelineTransformKind.Deduplicate,
            DeduplicateKeys = ["id"],
        },
    ],
    Destination = new PipelineDestinationDefinition
    {
        Kind = PipelineDestinationKind.JsonFile,
        Path = "data/customers.cleaned.json",
    },
    Options = new PipelineExecutionOptions
    {
        BatchSize = 2,
        CheckpointInterval = 1,
        ErrorMode = PipelineErrorMode.FailFast,
    },
    Hooks =
    [
        new PipelineCommandHookDefinition
        {
            Event = PipelineCommandHookEvent.OnRunSucceeded,
            CommandName = "NotifyPipeline",
            Arguments = new Dictionary<string, object?>
            {
                ["channel"] = "ops",
            },
        },
    ],
};

PipelineValidationResult validation = PipelinePackageValidator.Validate(package);
if (!validation.IsValid)
{
    throw new InvalidOperationException(string.Join(
        Environment.NewLine,
        validation.Errors.Select(error => $"{error.Path}: {error.Message}")));
}

await PipelinePackageSerializer.SaveToFileAsync(package, "data/customers.pipeline.json");
PipelinePackageDefinition loadedPackage =
    await PipelinePackageSerializer.LoadFromFileAsync("data/customers.pipeline.json");

var orchestrator = new PipelineOrchestrator(
    new DefaultPipelineComponentFactory(),
    new NullPipelineCheckpointStore(),
    new NullPipelineRunLogger(),
    DbCommandRegistry.Create(commands =>
    {
        commands.AddAsyncCommand(
            "NotifyPipeline",
            new DbCommandOptions(
                Description: "Publishes pipeline run metrics.",
                Timeout: TimeSpan.FromSeconds(10),
                IsLongRunning: true),
            static async (context, ct) =>
            {
                string pipelineName = context.Metadata["pipelineName"];
                long rowsWritten = context.Arguments["rowsWritten"].AsInteger;
                await NotifyOpsAsync(pipelineName, rowsWritten, ct);
                return DbCommandResult.Success();
            });
    }));

PipelineRunResult result = await orchestrator.ExecuteAsync(new PipelineRunRequest
{
    Package = loadedPackage,
    Mode = PipelineExecutionMode.Run,
});

Console.WriteLine(
    $"{result.Status}: {result.Metrics.RowsRead} read, " +
    $"{result.Metrics.RowsWritten} written, " +
    $"{result.Metrics.RowsRejected} rejected");

string outputJson = await File.ReadAllTextAsync("data/customers.cleaned.json");
Console.WriteLine(outputJson);

The output file contains the active customers only, with duplicate IDs removed:

[
  {
    "id": 1,
    "status": "active",
    "full_name": "Alice",
    "import_source": "csv"
  },
  {
    "id": 3,
    "status": "active",
    "full_name": "Carol",
    "import_source": "csv"
  }
]

Execution Modes

  • Validate: validate the package only, without creating components
  • DryRun: open the source and run transforms, but skip destination writes
  • Run: execute the full pipeline and persist checkpoints
  • Resume: continue a previous run from the checkpoint returned by your IPipelineCheckpointStore

Notes

  • DefaultPipelineComponentFactory is the ready-to-run built-in factory for file pipelines
  • Use NullPipelineCheckpointStore and NullPipelineRunLogger when you want a minimal in-process setup
  • Relative source file paths are searched from the current directory and app base directory; relative output paths are written relative to the current directory
  • Derive expressions are intentionally simple today: use a source column name or a literal such as 'csv', 123, true, or null
  • Trusted command hooks are skipped in Validate mode. Missing command registration, a timed-out command, or a failing hook with StopOnFailure = true fails the run through PipelineRunResult.
  • Pipeline command callbacks receive the run cancellation token; hosts should pass it to async I/O and set DbCommandOptions.Timeout for hooks that call external systems.
  • PipelinePackageSerializer regenerates Automation during save/load and string serialization. Validation accepts legacy packages without automation metadata, but reports stale manifests when present.

Installation

dotnet add package CSharpDB.Pipelines

For the recommended all-in-one package:

dotnet add package CSharpDB

Dependencies

  • CSharpDB.Primitives - shared type system used by cast mappings and pipeline contracts
Package Description
CSharpDB Recommended all-in-one package including pipelines, engine, storage, and client APIs
CSharpDB.Engine Embedded database engine for SQL and collection access
CSharpDB.Client Client SDK for database, pipeline, and maintenance operations

License

MIT - see LICENSE for details.

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (2)

Showing the top 2 NuGet packages that depend on CSharpDB.Pipelines:

Package Downloads
CSharpDB

All-in-one package for CSharpDB application development. Includes the unified client, engine, ADO.NET provider, and diagnostics.

CSharpDB.Client

Unified CSharpDB client SDK with pluggable transports (Direct, HTTP, gRPC, TCP, Named Pipes).

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
3.8.0 122 5/17/2026
3.7.0 138 5/9/2026
3.6.0 126 5/3/2026
3.5.0 121 4/28/2026
3.4.0 119 4/25/2026
3.3.0 118 4/23/2026
3.2.0 119 4/19/2026
3.1.2 109 4/15/2026
3.1.0 110 4/15/2026
3.0.0 140 4/8/2026
2.9.1 123 4/7/2026
2.8.1 110 4/6/2026
2.8.0 112 4/4/2026
2.7.0 108 3/31/2026
2.6.0 116 3/29/2026
2.5.0 209 3/28/2026
2.4.0 112 3/24/2026