Snail.Toolkit.SignalR.Reactive
1.0.1
There is a newer version of this package available.
See the version list below for details.
See the version list below for details.
dotnet add package Snail.Toolkit.SignalR.Reactive --version 1.0.1
NuGet\Install-Package Snail.Toolkit.SignalR.Reactive -Version 1.0.1
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Snail.Toolkit.SignalR.Reactive" Version="1.0.1" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Snail.Toolkit.SignalR.Reactive" Version="1.0.1" />
<PackageReference Include="Snail.Toolkit.SignalR.Reactive" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Snail.Toolkit.SignalR.Reactive --version 1.0.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Snail.Toolkit.SignalR.Reactive, 1.0.1"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Snail.Toolkit.SignalR.Reactive@1.0.1
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Snail.Toolkit.SignalR.Reactive&version=1.0.1
#tool nuget:?package=Snail.Toolkit.SignalR.Reactive&version=1.0.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
Toolkit.SignalR.Reactive
A high-performance SignalR solution for reactive binary data streaming between clients with chunking support, JWT authentication, and automatic reconnection.
Features
- Reactive Streaming using
IObservable<T>andIAsyncEnumerable<T> - Configurable Chunking (default 8KB chunks, adjustable per transfer)
- JWT Authentication with WebSocket-compatible token support
- Automatic Reconnection with configurable retry policies
- Memory-Efficient streaming with backpressure support
- Pending Transfer Handling for offline recipients (24h cache default)
- MessagePack Protocol for compact binary serialization
- Thread-Safe implementation for concurrent transfers
Installation
dotnet add package Snail.Toolkit.SignalR.Reactive
Server Setup
1. Configure Services
Add to your Startup.cs or equivalent:
/// <summary>
/// Adds and configures SignalR services with reactive transfer capabilities
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Configures SignalR with reactive transfer support including authentication,
/// message packing, compression, and caching
/// </summary>
/// <param name="services">The service collection</param>
/// <param name="configuration">Application configuration</param>
/// <param name="configureHub">Optional action to configure HubOptions</param>
/// <param name="configureJwt">Optional action to configure JWT options</param>
/// <remarks>
/// Includes:
/// - JWT authentication with SignalR WebSocket support
/// - MessagePack protocol for binary serialization
/// - Response compression for binary streams
/// - Memory cache configuration
/// </remarks>
public static void AddReactiveTransferSignalR(
this IServiceCollection services,
IConfiguration configuration,
Action<HubOptions>? configureHub = null,
Action<JwtBearerOptions>? configureJwt = null)
{
// Configure JWT authentication for SignalR
services.AddAuthJwtBearer(configuration, options =>
{
// Allow token in query string for WebSocket connections
options.Events = new JwtBearerEvents
{
OnMessageReceived = context =>
{
var accessToken = context.Request.Query["access_token"];
var path = context.HttpContext.Request.Path;
if (!string.IsNullOrEmpty(accessToken) &&
path.StartsWithSegments("/reactiveTransfer"))
{
context.Token = accessToken;
}
return Task.CompletedTask;
}
};
// Apply additional JWT configuration if provided
configureJwt?.Invoke(options);
});
// Configure SignalR with MessagePack protocol
services.AddSignalR(options =>
{
// Default configuration optimized for binary transfers
options.MaximumReceiveMessageSize = 1024 * 92; // 92KB
options.StreamBufferCapacity = 100; // Number of chunks to buffer
options.EnableDetailedErrors = true; // Better error messages
// Apply additional hub configuration if provided
configureHub?.Invoke(options);
}).AddMessagePackProtocol();
// Add response compression for binary streams
services.AddResponseCompression(opts =>
{
opts.MimeTypes = ResponseCompressionDefaults.MimeTypes
.Concat(["application/octet-stream"]);
});
// Add caching service with configuration
services.AddCacheService(configuration);
}
}
builder.Services.AddReactiveTransferSignalR(builder.Configuration);
2. Configure Endpoints
public static class EndpointRouteBuilderExtensions
{
public static void MapReactiveTransferHub(this IEndpointRouteBuilder app)
{
ArgumentNullException.ThrowIfNull(app);
app.MapHub<ReactiveTransferHub>("/reactiveTransfer")
.RequireAuthorization();
}
}
app.UseAuthentication();
app.UseAuthorization();
app.UseResponseCompression();
app.MapReactiveTransferHub();
Client Implementation
1. Configure Hub Connection
public static void AddHubConnection(this IServiceCollection services, string baseAddress)
{
var hubConnection = new HubConnectionBuilder()
.WithUrl(baseAddress + "reactiveTransfer", options =>
{
options.AccessTokenProvider = () => Task.FromResult(AccessToken);
})
.AddMessagePackProtocol()
.ConfigureLogging(logging => logging.SetMinimumLevel(LogLevel.Information))
.WithAutomaticReconnect()
.Build();
services.AddSingleton(hubConnection);
services.AddReactivePipeline();
}
builder.Services.AddHubConnection(builder.HostEnvironment.BaseAddress);
2. Send Files
// Send as byte array (auto-chunked)
await _sender.SendAsync("client123", fileBytes, chunkSize: 16384, channel: "secure-channel");
// Send as observable stream
var fileStream = Observable.FromAsync(() => File.ReadAllBytesAsync("largefile.bin"));
await _sender.SendAsync("client123", fileStream);
3. Receive Data
transferReceiver.SetChannel("secure-channel");
// Subscribe to chunks
transferReceiver.OnChunkReceived += async (transferId, chunk) =>
{
await _buffer.WriteAsync(chunk);
};
// Handle completed transfers
transferReceiver.TransferCompleted += async (transferId, data) =>
{
await File.WriteAllBytesAsync($"{transferId}.bin", data);
};
Configuration Options
Server Options (appsettings.json)
{
"MemoryCacheOptions": {
"PendingTransferCacheDuration": 24, // Hours
"UserIdentifierCacheDuration": 1 // Hours
}
}
Performance Tuning
| Setting | Recommended Value | Description |
|---|---|---|
| Chunk Size | 8KB-64KB | Balance between overhead and latency |
| Buffer Capacity | 50-200 | Concurrent chunks in flight |
| Cache Duration | 1-24h | Offline transfer availability |
Security Features
- End-to-End Encryption: JWT-secured connections
- Query String Tokens: WebSocket-compatible auth
- Input Validation: All parameters validated
- Resource Isolation: Separate streams per session
Advanced Usage
Custom Serialization
services.AddSignalR()
.AddMessagePackProtocol(options =>
{
options.SerializerOptions = MessagePackSerializerOptions.Standard
.WithCompression(MessagePackCompression.Lz4Block);
});
Error Handling
try
{
await _sender.SendAsync(...);
}
catch (TransferException ex)
{
_logger.LogError("Transfer failed: {Error}", ex.TransferError);
await _retryPolicy.ExecuteAsync(() => _sender.SendAsync(...));
}
Examples
Large File Transfer
// Server-side
app.UseResponseCompression(opts =>
{
opts.Providers.Add<GzipCompressionProvider>();
opts.MimeTypes = ["application/octet-stream"];
});
// Client-side
var fileStream = Observable.Create<byte[]>(async observer =>
{
using var file = File.OpenRead("4k-video.mp4");
var buffer = new byte[65536]; // 64KB chunks
int bytesRead;
while ((bytesRead = await file.ReadAsync(buffer)) > 0)
{
var chunk = new byte[bytesRead];
Buffer.BlockCopy(buffer, 0, chunk, 0, bytesRead);
observer.OnNext(chunk);
}
observer.OnCompleted();
});
await _sender.SendAsync("client456", fileStream);
Benchmarks
| Scenario | Throughput | Memory Usage |
|---|---|---|
| 1MB File | 1200 MB/s | 12 MB |
| 100MB Stream | 950 MB/s | 18 MB |
| 1GB Chunked | 850 MB/s | 25 MB |
Troubleshooting
Q: Transfers fail with large files
# Increase message size limits
services.AddSignalR(options =>
{
options.MaximumReceiveMessageSize = 1024 * 1024 * 100; // 100MB
});
Q: WebSocket token issues
# Ensure token provider is configured
.WithUrl(baseAddress + "reactiveTransfer", options =>
{
options.AccessTokenProvider = () => GetUserTokenAsync();
})
License
Toolkit.SignalR.Reactive is a free and open source project, released under the permissible MIT license.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net9.0
- Microsoft.AspNetCore.SignalR.Client (>= 9.0.6)
- Microsoft.AspNetCore.SignalR.Core (>= 1.2.0)
- Microsoft.AspNetCore.SignalR.Protocols.MessagePack (>= 9.0.6)
- Microsoft.Extensions.Caching.Memory (>= 9.0.6)
- Microsoft.Extensions.Configuration (>= 9.0.6)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 9.0.6)
- System.Linq.Async (>= 6.0.1)
- System.Reactive.Linq (>= 6.0.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.