MM.PipeBlocks 1.0.8

dotnet add package MM.PipeBlocks --version 1.0.8
                    
NuGet\Install-Package MM.PipeBlocks -Version 1.0.8
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="MM.PipeBlocks" Version="1.0.8" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="MM.PipeBlocks" Version="1.0.8" />
                    
Directory.Packages.props
<PackageReference Include="MM.PipeBlocks" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add MM.PipeBlocks --version 1.0.8
                    
#r "nuget: MM.PipeBlocks, 1.0.8"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package MM.PipeBlocks@1.0.8
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=MM.PipeBlocks&version=1.0.8
                    
Install as a Cake Addin
#tool nuget:?package=MM.PipeBlocks&version=1.0.8
                    
Install as a Cake Tool

PipeBlocks

A composable pipeline library for defining process flows with sequential execution, branching, try/catch handling, and mixed sync/async support. Each process step is encapsulated as a "block," enabling modular and unit-testable workflows. The pipeline follows a two-rail system, breaking early on failure, with results wrapped in an Either monad for clear success/failure state management.

Purpose

Pipeline-oriented programming is an effort to simplify the process flows in programming by having the process flow in one direction only. This package supports mono-directional flow with branching.

Features

  • Modular Design: Encapsulate functionality within individual blocks, promoting reusability and testability.
  • Composable Pipelines: Chain blocks together to define complex workflows in a linear and understandable fashion.
  • Enhanced Readability: By structuring code into blocks, the overall logic becomes more transparent and easier to follow.

Getting Started

Installation

Clone or include the project into your solution

git clone https://github.com/mmintoff/PipeBlocks.git

Define a Context, Value and FailureState model

public class MyValueModel
{
    public DateTime RetrievedAt { get; set; }
    public string? TextRetrieved { get; set; }
    public DateTime ProcessedAt { get; set; }
    public int WordCount { get; set; }
}

public class MyContextModel(MyValueModel value) : IContext<MyValueModel>
{
    public Guid CorrelationId { get; set; }
    public Either<IFailureState<MyValueModel>, MyValueModel> Value { get; set; } = value;
    public bool IsFinished { get; set; }
    public bool IsFlipped { get; set; }

    public string? RequestUrl { get; set; }
}

public class MyFailureState : IFailureState<MyValueModel>
{
    public MyValueModel Value { get; set; }
    public Guid CorrelationId { get; set; }
    public string? FailureReason { get; set; }
}

Define Code Blocks

public class RetrieveTextBlockAsync : AsyncCodeBlock<MyContextModel, MyValueModel>
{
    protected override async ValueTask<MyContextModel> ExecuteAsync(MyContextModel context, MyValueModel value)
    {
        try
        {
            value.TextRetrieved = await new HttpClient(new HttpClientHandler
            {
                AutomaticDecompression = System.Net.DecompressionMethods.GZip
            }).GetStringAsync(context.RequestUrl);
            value.RetrievedAt = DateTime.UtcNow;
        }
        catch (Exception ex)
        {
            context.SignalBreak(new MyFailureState
            {
                Value = value,
                CorrelationId = context.CorrelationId,
                FailureReason = ex.Message
            });
        }
        return context;
    }
}

public class WordCountBlock : CodeBlock<MyContextModel, MyValueModel>
{
    protected override MyContextModel Execute(MyContextModel context, MyValueModel value)
    {
        char[] tokens = [.. value.TextRetrieved!.Select(c => char.IsLetter(c) ? c : ' ')];
        value.WordCount = new string(tokens).Split([' '], StringSplitOptions.RemoveEmptyEntries).Length;
        value.ProcessedAt = DateTime.UtcNow;
        return context;
    }
}

Define a Block Resolver

public class ServiceProviderBackedResolver<C, V>(IServiceProvider hostProvider) : IBlockResolver<C, V>
    where C : IContext<V>
{
    public X ResolveInstance<X>() where X : IBlock<C, V>
        => hostProvider.GetRequiredService<X>();
}

Setting up Dependency Injection

var serviceCollection = new ServiceCollection();
serviceCollection.AddTransient<IBlockResolver<MyContextModel, MyValueModel>, ServiceProviderBackedResolver<MyContextModel, MyValueModel>>();
serviceCollection.AddTransient<BlockBuilder<MyContextModel, MyValueModel>>();
serviceCollection.AddTransient<RetrieveTextBlockAsync>();
serviceCollection.AddTransient<WordCountBlock>();
serviceCollection.AddLogging(configure =>
{
    configure.ClearProviders();
    configure.AddConsole();
    configure.SetMinimumLevel(LogLevel.Trace);
});

var serviceProvider = serviceCollection.BuildServiceProvider();

Create a Pipe

var builder = serviceProvider.GetRequiredService<BlockBuilder<MyContextModel, MyValueModel>>();
var pipe = builder.CreatePipe("Word Counter")
    .Then(builder.ReturnIf(
        condition: c => string.IsNullOrWhiteSpace(c.RequestUrl),
        doThis: (c, v) => c.SignalBreak(new MyFailureState
        {
            CorrelationId = c.CorrelationId,
            Value = v,
            FailureReason = "Request Url empty"
        })
    ))
    .Then<RetrieveTextBlockAsync>()
    .Then(builder.Run((c, v) => WriteToConsole(v.RetrievedAt)))
    .Then(builder.Run(c =>
    {
        Console.WriteLine();
        return c;
    }))
    .Then<WordCountBlock>()
    .Then(builder.Run((c, v) => WriteToConsole(v.ProcessedAt)))
    ;
	
void WriteToConsole(DateTime dt)
{
    Console.WriteLine(dt.ToString("yyyyMMdd HHmmss"));
}

Execute Pipe (Expected Failure)

var result = pipe.Execute(new MyContextModel(new())
{
    CorrelationId = Guid.NewGuid(),
    RequestUrl = null
});

result.Value.Match(
    failure => Console.WriteLine($"Failure: {failure.FailureReason}"),
    success => Console.WriteLine($"Success: {success.WordCount} words"));
info: MM.PipeBlocks.PipeBlock[0]
      Created pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Added block: 'MM.PipeBlocks.Blocks.BranchBlock`2[pbTest.MyContextModel,pbTest.MyValueModel]' to pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Added block: 'pbTest.RetrieveTextBlockAsync' to pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Added block: 'MM.PipeBlocks.Blocks.FuncBlock`2[pbTest.MyContextModel,pbTest.MyValueModel] (Method: Program+<>c.<<Main>$>b__0_3)' to pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Added block: 'MM.PipeBlocks.Blocks.FuncBlock`2[pbTest.MyContextModel,pbTest.MyValueModel] (Method: Program+<>c.<<Main>$>b__0_4)' to pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Added block: 'pbTest.WordCountBlock' to pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Added block: 'MM.PipeBlocks.Blocks.FuncBlock`2[pbTest.MyContextModel,pbTest.MyValueModel] (Method: Program+<>c.<<Main>$>b__0_5)' to pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Executing pipe: 'Word Counter' synchronously for context: c84e5d9a-db87-4f8e-b98c-248a5bc1d3a6
info: MM.PipeBlocks.Blocks.ReturnBlock[0]
      Context c84e5d9a-db87-4f8e-b98c-248a5bc1d3a6 terminated in Return Block
trce: MM.PipeBlocks.PipeBlock[0]
      Stopping synchronous pipe: 'Word Counter' execution at step: 1 for context: c84e5d9a-db87-4f8e-b98c-248a5bc1d3a6
trce: MM.PipeBlocks.PipeBlock[0]
      Completed synchronous pipe: 'Word Counter' execution for context: c84e5d9a-db87-4f8e-b98c-248a5bc1d3a6
Failure: Request Url empty

Execute Pipe (Expected Success)

var result = pipe.Execute(new MyContextModel(new())
{
    CorrelationId = Guid.NewGuid(),
    RequestUrl = "https://www.gutenberg.org/cache/epub/11/pg11.txt"
});

result.Value.Match(
    failure => Console.WriteLine($"Failure: {failure.FailureReason}"),
    success => Console.WriteLine($"Success: {success.WordCount} words"));
info: MM.PipeBlocks.PipeBlock[0]
      Created pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Added block: 'MM.PipeBlocks.Blocks.BranchBlock`2[pbTest.MyContextModel,pbTest.MyValueModel]' to pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Added block: 'pbTest.RetrieveTextBlockAsync' to pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Added block: 'MM.PipeBlocks.Blocks.FuncBlock`2[pbTest.MyContextModel,pbTest.MyValueModel] (Method: Program+<>c.<<Main>$>b__0_3)' to pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Added block: 'MM.PipeBlocks.Blocks.FuncBlock`2[pbTest.MyContextModel,pbTest.MyValueModel] (Method: Program+<>c.<<Main>$>b__0_4)' to pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Added block: 'pbTest.WordCountBlock' to pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Added block: 'MM.PipeBlocks.Blocks.FuncBlock`2[pbTest.MyContextModel,pbTest.MyValueModel] (Method: Program+<>c.<<Main>$>b__0_5)' to pipe: 'Word Counter'
trce: MM.PipeBlocks.PipeBlock[0]
      Executing pipe: 'Word Counter' synchronously for context: 1d31f160-bad4-4553-b3b3-a7358ba3f775
20250428 074750

20250428 074750
trce: MM.PipeBlocks.PipeBlock[0]
      Completed synchronous pipe: 'Word Counter' execution for context: 1d31f160-bad4-4553-b3b3-a7358ba3f775
Success: 30475 words
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on MM.PipeBlocks:

Package Downloads
MM.PipeBlocks.Extensions

Additional functionality

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.8 144 6/23/2025
1.0.7 292 6/12/2025
1.0.6 151 5/29/2025
1.0.5 148 5/28/2025
1.0.4 167 4/28/2025
1.0.3 165 4/28/2025
1.0.2 141 4/25/2025
1.0.1 138 4/25/2025
1.0.0 172 4/24/2025