CSharpDB.Pipelines
3.8.0
Prefix Reserved
dotnet add package CSharpDB.Pipelines --version 3.8.0
NuGet\Install-Package CSharpDB.Pipelines -Version 3.8.0
<PackageReference Include="CSharpDB.Pipelines" Version="3.8.0" />
<PackageVersion Include="CSharpDB.Pipelines" Version="3.8.0" />
<PackageReference Include="CSharpDB.Pipelines" />
paket add CSharpDB.Pipelines --version 3.8.0
#r "nuget: CSharpDB.Pipelines, 3.8.0"
#:package CSharpDB.Pipelines@3.8.0
#addin nuget:?package=CSharpDB.Pipelines&version=3.8.0
#tool nuget:?package=CSharpDB.Pipelines&version=3.8.0
CSharpDB.Pipelines
Package contracts and runtime foundation for CSharpDB ETL pipelines.
Overview
CSharpDB.Pipelines defines portable pipeline packages and a small orchestration
runtime for batch ETL work. A pipeline package describes the source,
transformations, destination, execution options, and optional incremental state.
The built-in runtime can validate packages, serialize them to JSON, execute them
in batches, capture checkpoints, and report rejects and run metrics.
Packages can also name trusted host commands for lifecycle hooks; command bodies
are registered by the process that runs the pipeline and are not serialized into
the package. Package JSON includes generated automation metadata that lists the
trusted commands and scalar functions a host must register.
Current boundary:
- Built-in runtime components currently support CSV and JSON file sources/destinations
- Built-in transforms support
Select,Rename,Cast,Filter,Derive, andDeduplicate CSharpDBtable sources/destinations and SQL query sources are modeled in the contracts but are not implemented byDefaultPipelineComponentFactoryyet
Features
- Pipeline package model: strongly typed source, transform, destination, and execution settings
- Validation: schema-level validation before execution
- Serialization: save/load pipeline packages as JSON
- Runtime orchestration:
Validate,DryRun,Run, andResumemodes - Built-in connectors: CSV and JSON file readers/writers
- Built-in transforms: select, rename, cast, filter, derive, deduplicate
- Checkpointing hooks: pluggable checkpoint store and run logger abstractions
- Trusted command hooks: host-registered commands for run started, batch completed, run succeeded, and run failed events
- Automation metadata: generated import/export manifest for trusted command and scalar function names
- Batch metrics: rows read/written/rejected plus batch counts
Usage
End-to-End Example
using CSharpDB.Pipelines.Models;
using CSharpDB.Pipelines.Runtime;
using CSharpDB.Pipelines.Runtime.BuiltIns;
using CSharpDB.Pipelines.Serialization;
using CSharpDB.Pipelines.Validation;
using CSharpDB.Primitives;
Directory.CreateDirectory("data");
await File.WriteAllLinesAsync("data/customers.csv",
[
"id,name,status",
"1,Alice,active",
"1,Alice,active",
"2,Bob,inactive",
"3,Carol,active",
]);
var package = new PipelinePackageDefinition
{
Name = "customers-csv-to-json",
Version = "1.0.0",
Description = "Import customers from CSV, clean them, and emit JSON.",
Source = new PipelineSourceDefinition
{
Kind = PipelineSourceKind.CsvFile,
Path = "data/customers.csv",
HasHeaderRow = true,
},
Transforms =
[
new PipelineTransformDefinition
{
Kind = PipelineTransformKind.Select,
SelectColumns = ["id", "name", "status"],
},
new PipelineTransformDefinition
{
Kind = PipelineTransformKind.Rename,
RenameMappings =
[
new PipelineRenameMapping { Source = "name", Target = "full_name" },
],
},
new PipelineTransformDefinition
{
Kind = PipelineTransformKind.Cast,
CastMappings =
[
new PipelineCastMapping { Column = "id", TargetType = DbType.Integer },
],
},
new PipelineTransformDefinition
{
Kind = PipelineTransformKind.Filter,
FilterExpression = "status == 'active'",
},
new PipelineTransformDefinition
{
Kind = PipelineTransformKind.Derive,
DerivedColumns =
[
new PipelineDerivedColumn { Name = "import_source", Expression = "'csv'" },
],
},
new PipelineTransformDefinition
{
Kind = PipelineTransformKind.Deduplicate,
DeduplicateKeys = ["id"],
},
],
Destination = new PipelineDestinationDefinition
{
Kind = PipelineDestinationKind.JsonFile,
Path = "data/customers.cleaned.json",
},
Options = new PipelineExecutionOptions
{
BatchSize = 2,
CheckpointInterval = 1,
ErrorMode = PipelineErrorMode.FailFast,
},
Hooks =
[
new PipelineCommandHookDefinition
{
Event = PipelineCommandHookEvent.OnRunSucceeded,
CommandName = "NotifyPipeline",
Arguments = new Dictionary<string, object?>
{
["channel"] = "ops",
},
},
],
};
PipelineValidationResult validation = PipelinePackageValidator.Validate(package);
if (!validation.IsValid)
{
throw new InvalidOperationException(string.Join(
Environment.NewLine,
validation.Errors.Select(error => $"{error.Path}: {error.Message}")));
}
await PipelinePackageSerializer.SaveToFileAsync(package, "data/customers.pipeline.json");
PipelinePackageDefinition loadedPackage =
await PipelinePackageSerializer.LoadFromFileAsync("data/customers.pipeline.json");
var orchestrator = new PipelineOrchestrator(
new DefaultPipelineComponentFactory(),
new NullPipelineCheckpointStore(),
new NullPipelineRunLogger(),
DbCommandRegistry.Create(commands =>
{
commands.AddAsyncCommand(
"NotifyPipeline",
new DbCommandOptions(
Description: "Publishes pipeline run metrics.",
Timeout: TimeSpan.FromSeconds(10),
IsLongRunning: true),
static async (context, ct) =>
{
string pipelineName = context.Metadata["pipelineName"];
long rowsWritten = context.Arguments["rowsWritten"].AsInteger;
await NotifyOpsAsync(pipelineName, rowsWritten, ct);
return DbCommandResult.Success();
});
}));
PipelineRunResult result = await orchestrator.ExecuteAsync(new PipelineRunRequest
{
Package = loadedPackage,
Mode = PipelineExecutionMode.Run,
});
Console.WriteLine(
$"{result.Status}: {result.Metrics.RowsRead} read, " +
$"{result.Metrics.RowsWritten} written, " +
$"{result.Metrics.RowsRejected} rejected");
string outputJson = await File.ReadAllTextAsync("data/customers.cleaned.json");
Console.WriteLine(outputJson);
The output file contains the active customers only, with duplicate IDs removed:
[
{
"id": 1,
"status": "active",
"full_name": "Alice",
"import_source": "csv"
},
{
"id": 3,
"status": "active",
"full_name": "Carol",
"import_source": "csv"
}
]
Execution Modes
Validate: validate the package only, without creating componentsDryRun: open the source and run transforms, but skip destination writesRun: execute the full pipeline and persist checkpointsResume: continue a previous run from the checkpoint returned by yourIPipelineCheckpointStore
Notes
DefaultPipelineComponentFactoryis the ready-to-run built-in factory for file pipelines- Use
NullPipelineCheckpointStoreandNullPipelineRunLoggerwhen you want a minimal in-process setup - Relative source file paths are searched from the current directory and app base directory; relative output paths are written relative to the current directory
Deriveexpressions are intentionally simple today: use a source column name or a literal such as'csv',123,true, ornull- Trusted command hooks are skipped in
Validatemode. Missing command registration, a timed-out command, or a failing hook withStopOnFailure = truefails the run throughPipelineRunResult. - Pipeline command callbacks receive the run cancellation token; hosts should pass it to async I/O and set
DbCommandOptions.Timeoutfor hooks that call external systems. PipelinePackageSerializerregeneratesAutomationduring save/load and string serialization. Validation accepts legacy packages without automation metadata, but reports stale manifests when present.
Installation
dotnet add package CSharpDB.Pipelines
For the recommended all-in-one package:
dotnet add package CSharpDB
Dependencies
CSharpDB.Primitives- shared type system used by cast mappings and pipeline contracts
Related Packages
| Package | Description |
|---|---|
| CSharpDB | Recommended all-in-one package including pipelines, engine, storage, and client APIs |
| CSharpDB.Engine | Embedded database engine for SQL and collection access |
| CSharpDB.Client | Client SDK for database, pipeline, and maintenance operations |
License
MIT - see LICENSE for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- CSharpDB.Primitives (>= 3.8.0)
NuGet packages (2)
Showing the top 2 NuGet packages that depend on CSharpDB.Pipelines:
| Package | Downloads |
|---|---|
|
CSharpDB
All-in-one package for CSharpDB application development. Includes the unified client, engine, ADO.NET provider, and diagnostics. |
|
|
CSharpDB.Client
Unified CSharpDB client SDK with pluggable transports (Direct, HTTP, gRPC, TCP, Named Pipes). |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 3.8.0 | 122 | 5/17/2026 |
| 3.7.0 | 138 | 5/9/2026 |
| 3.6.0 | 126 | 5/3/2026 |
| 3.5.0 | 121 | 4/28/2026 |
| 3.4.0 | 119 | 4/25/2026 |
| 3.3.0 | 118 | 4/23/2026 |
| 3.2.0 | 119 | 4/19/2026 |
| 3.1.2 | 109 | 4/15/2026 |
| 3.1.0 | 110 | 4/15/2026 |
| 3.0.0 | 140 | 4/8/2026 |
| 2.9.1 | 123 | 4/7/2026 |
| 2.8.1 | 110 | 4/6/2026 |
| 2.8.0 | 112 | 4/4/2026 |
| 2.7.0 | 108 | 3/31/2026 |
| 2.6.0 | 116 | 3/29/2026 |
| 2.5.0 | 209 | 3/28/2026 |
| 2.4.0 | 112 | 3/24/2026 |