FluentSignals.SignalBus 2.1.2

There is a newer version of this package available.
See the version list below for details.
dotnet add package FluentSignals.SignalBus --version 2.1.2
                    
NuGet\Install-Package FluentSignals.SignalBus -Version 2.1.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="FluentSignals.SignalBus" Version="2.1.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="FluentSignals.SignalBus" Version="2.1.2" />
                    
Directory.Packages.props
<PackageReference Include="FluentSignals.SignalBus" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add FluentSignals.SignalBus --version 2.1.2
                    
#r "nuget: FluentSignals.SignalBus, 2.1.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package FluentSignals.SignalBus@2.1.2
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=FluentSignals.SignalBus&version=2.1.2
                    
Install as a Cake Addin
#tool nuget:?package=FluentSignals.SignalBus&version=2.1.2
                    
Install as a Cake Tool

FluentSignals.SignalBus

A lightweight, high-performance publish-subscribe messaging system for .NET applications. Designed to work seamlessly with both Blazor WebAssembly and Blazor Server, as well as any other .NET application.

Features

Core Features ✅

  • Type-safe messaging - Strongly typed publish/subscribe pattern
  • Async/Sync handlers - Support for both Action<T> and Func<T, Task>
  • Thread-safe - Built with ConcurrentDictionary and SemaphoreSlim
  • Error handling - Built-in error events and resilience
  • Memory efficient - Automatic cleanup with IDisposable and IAsyncDisposable
  • Statistics - Track messages, subscriptions, and errors
  • Blazor compatible - Works in both WASM and Server modes

Installation

dotnet add package FluentSignals.SignalBus

Quick Start

1. Register the SignalBus

// In Program.cs or Startup.cs
builder.Services.AddSignalBus();

// Or with middleware configuration
builder.Services.AddSignalBus(options =>
{
    options.EnableStatistics = true;
    options.EnableCorrelationId = true;
    
    options.UseMiddleware(pipeline => pipeline
        .UseCorrelationId()
        .UseLogging(LogLevel.Information)
        .UsePerformanceTracking(
            slowMessageThreshold: TimeSpan.FromMilliseconds(100),
            onSlowMessage: (context, elapsed) => 
            {
                logger.LogWarning("Slow message {Type} took {Elapsed}ms", 
                    context.MessageType.Name, elapsed.TotalMilliseconds);
            })
        .UseExceptionHandling(
            swallowExceptions: false,
            onException: (ex, context) =>
            {
                logger.LogError(ex, "Error in SignalBus for {Type}", context.MessageType.Name);
            })
        .UseValidation(validation => validation
            .RegisterValidator<UserLoggedIn>(msg => !string.IsNullOrEmpty(msg.UserId))
            .RegisterValidator<DataUpdated>(msg => msg.NewValue != null))
        .UseCustom("timing", async (context, next) =>
        {
            var sw = Stopwatch.StartNew();
            await next(context);
            logger.LogDebug("Message processed in {Elapsed}ms", sw.ElapsedMilliseconds);
        })
    );
});

2. Define Message Types

public record UserLoggedIn(string UserId, DateTime Timestamp);
public record DataUpdated(string EntityId, object NewValue);

3. Publish Messages

public class LoginService
{
    private readonly ISignalBus _signalBus;

    public LoginService(ISignalBus signalBus)
    {
        _signalBus = signalBus;
    }

    public async Task<bool> LoginAsync(string userId, string password)
    {
        // Perform login logic...
        
        // Publish event
        await _signalBus.PublishAsync(new UserLoggedIn(userId, DateTime.UtcNow));
        
        return true;
    }
}

4. Subscribe to Messages

public class NotificationService : IDisposable
{
    private readonly ISignalBus _signalBus;
    private readonly List<SignalBusSubscription> _subscriptions = new();

    public NotificationService(ISignalBus signalBus)
    {
        _signalBus = signalBus;
        Initialize();
    }

    private async void Initialize()
    {
        // Sync handler
        var sub1 = await _signalBus.Subscribe<UserLoggedIn>(user =>
        {
            Console.WriteLine($"User {user.UserId} logged in at {user.Timestamp}");
        });
        _subscriptions.Add(sub1);

        // Async handler
        var sub2 = await _signalBus.SubscribeAsync<DataUpdated>(async data =>
        {
            await ProcessDataUpdateAsync(data);
        });
        _subscriptions.Add(sub2);
    }

    private async Task ProcessDataUpdateAsync(DataUpdated data)
    {
        // Process the update...
        await Task.Delay(100);
    }

    public void Dispose()
    {
        foreach (var subscription in _subscriptions)
        {
            subscription.Dispose();
        }
    }
}

Advanced Usage

Single Subscription (Prevent Duplicates)

// Only one subscription per target type is allowed
await _signalBus.SubscribeSingle<UserLoggedIn>(HandleLogin);
await _signalBus.SubscribeSingleAsync<DataUpdated>(HandleDataUpdateAsync);

Error Handling

// Subscribe to errors
_signalBus.ErrorOccurred += (sender, context) =>
{
    _logger.LogError(context.Exception, 
        "Error processing {MessageType}: {Message}", 
        context.MessageType.Name, 
        context.Message);
};

// Messages continue to other subscribers even if one fails

Statistics and Monitoring

var stats = _signalBus.GetStatistics();

Console.WriteLine($"Total messages: {stats.TotalMessagesPublished}");
Console.WriteLine($"Active subscriptions: {stats.ActiveSubscriptions}");
Console.WriteLine($"Total errors: {stats.TotalErrors}");

foreach (var (type, count) in stats.MessagesByType)
{
    Console.WriteLine($"{type}: {count} messages");
}

Async Disposal

await using var subscription = await _signalBus.SubscribeAsync<MyMessage>(HandleAsync);
// Subscription is automatically removed when disposed

Middleware Pipeline

The SignalBus supports a middleware pipeline that runs on every PublishAsync call. Middleware can:

  • Log messages
  • Add correlation IDs
  • Validate messages
  • Track performance
  • Handle exceptions
  • Cancel message delivery

Built-in Middleware

builder.Services.AddSignalBus(options =>
{
    options.UseMiddleware(pipeline => pipeline
        // Add correlation ID to messages
        .UseCorrelationId()
        
        // Log all messages
        .UseLogging(LogLevel.Debug)
        
        // Track slow messages
        .UsePerformanceTracking(TimeSpan.FromMilliseconds(50))
        
        // Handle exceptions
        .UseExceptionHandling(swallowExceptions: true)
        
        // Validate messages
        .UseValidation(v => v.RegisterValidator<MyMessage>(IsValid))
    );
});

Custom Middleware

public class CustomMiddleware : ISignalBusMiddleware
{
    public async Task InvokeAsync(SignalBusContext context, SignalBusDelegate next)
    {
        // Before message processing
        Console.WriteLine($"Processing {context.MessageType.Name}");
        
        // Call next middleware
        await next(context);
        
        // After message processing
        Console.WriteLine($"Processed {context.MessageType.Name}");
    }
}

// Register it
options.UseMiddleware(pipeline => pipeline.Use<CustomMiddleware>());

Accessing Middleware Context

options.UseMiddleware(pipeline => pipeline
    .Use(async (context, next) =>
    {
        // Access context properties
        var messageType = context.MessageType;
        var correlationId = context.CorrelationId;
        var subscriberCount = context.SubscriberCount;
        
        // Store data for other middleware
        context.Items["ProcessingStart"] = DateTime.UtcNow;
        
        await next(context);
        
        // Cancel further processing
        if (someCondition)
            context.IsCancelled = true;
    })
);

Blazor Integration

In Blazor Components

@implements IDisposable
@inject ISignalBus SignalBus

<div>
    <h3>Notifications</h3>
    @foreach (var notification in _notifications)
    {
        <div>@notification</div>
    }
</div>

@code {
    private List<string> _notifications = new();
    private SignalBusSubscription? _subscription;

    protected override async Task OnInitializedAsync()
    {
        _subscription = await SignalBus.SubscribeAsync<UserLoggedIn>(async user =>
        {
            _notifications.Add($"User {user.UserId} logged in");
            await InvokeAsync(StateHasChanged);
        });
    }

    public void Dispose()
    {
        _subscription?.Dispose();
    }
}

Performance Considerations

  1. Thread Safety: All operations are thread-safe but use locks sparingly
  2. Memory: Subscriptions hold strong references by default - always dispose them
  3. Async Publishing: Messages are processed in parallel for better performance
  4. WASM Compatibility: No timers or background threads that could cause issues

Roadmap

  • Weak reference support
  • Message filtering and routing
  • Priority queues
  • Batching support
  • Debugging dashboard
  • Distributed messaging (separate package)

License

MIT License

Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on FluentSignals.SignalBus:

Package Downloads
FluentSignals.Blazor

Blazor integration for FluentSignals - A reactive state management library. Includes SignalBus for component communication, HTTP resource components, typed resource factories, and Blazor-specific helpers.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.1.5 119 7/17/2025
2.1.4 122 7/17/2025
2.1.3 123 7/15/2025
2.1.2 155 7/9/2025
2.1.1 145 7/8/2025
2.1.0 148 7/8/2025
2.0.0 147 6/29/2025

v2.1.2 - Added middleware pipeline support for message processing, statistics tracking, error handling improvements, and comprehensive async/sync handler support.