ReactiveLock.Distributed.Redis
0.0.22
Prefix Reserved
See the version list below for details.
dotnet add package ReactiveLock.Distributed.Redis --version 0.0.22
NuGet\Install-Package ReactiveLock.Distributed.Redis -Version 0.0.22
<PackageReference Include="ReactiveLock.Distributed.Redis" Version="0.0.22" />
<PackageVersion Include="ReactiveLock.Distributed.Redis" Version="0.0.22" />
<PackageReference Include="ReactiveLock.Distributed.Redis" />
paket add ReactiveLock.Distributed.Redis --version 0.0.22
#r "nuget: ReactiveLock.Distributed.Redis, 0.0.22"
#:package ReactiveLock.Distributed.Redis@0.0.22
#addin nuget:?package=ReactiveLock.Distributed.Redis&version=0.0.22
#tool nuget:?package=ReactiveLock.Distributed.Redis&version=0.0.22
ReactiveLock
ReactiveLock is a .NET 9 library for reactive, distributed lock coordination. It allows multiple application instances to track busy/idle state and react to state changes using async handlers.
It supports both in-process and distributed synchronization. Redis is the stable distributed backend.
Packages
Badges | Package Name | Description |
---|---|---|
ReactiveLock.Core | Core abstractions and in-process lock coordination | |
ReactiveLock.DependencyInjection | Adds DI and named resolution for distributed backends | |
ReactiveLock.Distributed.Redis | Redis-based distributed lock synchronization | |
ReactiveLock.Distributed.Grpc | Grpc-based distributed lock synchronization |
Use only ReactiveLock.Core if you don't need distributed coordination.
Installation
In-process only:
dotnet add package ReactiveLock.Core
Distributed with Redis:
dotnet add package ReactiveLock.Core
dotnet add package ReactiveLock.DependencyInjection
dotnet add package ReactiveLock.Distributed.Redis
Distributed with Grpc:
dotnet add package ReactiveLock.Core
dotnet add package ReactiveLock.DependencyInjection
dotnet add package ReactiveLock.Distributed.Grpc
Core architecture
ReactiveLock is designed with an in-memory-first awareness model, actual lock control depends on the configured mode:
- In local-only mode, all lock transitions (
IncrementAsync
,DecrementAsync
, etc.) are performed entirely in memory, with no external calls. - In distributed mode, lock transitions are resolved through the distributed backend (such as Redis / Grpc), and only then is the local state updated. This ensures consistent coordination across all instances.
This design enables responsive, high-performance event-driven behavior while supporting multi-instance environments through external synchronization.
Consistency and Usage Considerations
- It is designed for reactive and near real-time lock coordination, propagation, and notification.
- It offers a practical alternative to traditional eventual consistency, supporting preemptive orchestration of processes before critical events.
- Lock propagation delays may occur due to workload, thread pool pressure, or (in distributed mode) Redis / Grpc latency.
- For workloads requiring strong consistency, ReactiveLock should be combined with transactional layers or used as a complementary coordination mechanism, not as the sole source of truth.
Note: Distributed failure and contention mitigation features are a work in progress. Use distributed mode with awareness of its current limitations.
Given this, you can observe:
Architecture Diagram
Usage
Simpler approach – Local-only (in-process)
Use this when you want a lightweight, in-memory, thread-coordinated lock mechanism within a single process.
using MichelOliveira.Com.ReactiveLock.Core;
// Create a new tracker state instance
var state = new ReactiveLockTrackerState();
// Set the local state as blocked (simulates a lock being held)
await state.SetLocalStateBlockedAsync();
// Start 3 tasks that will each wait for the state to become unblocked
var tasks = Enumerable.Range(1, 3).Select(i =>
Task.Run(async () => {
Console.WriteLine($"[Task {i}] Waiting...");
// Each task will wait here until the state becomes unblocked
await state.WaitIfBlockedAsync();
// Once unblocked, this message will print
Console.WriteLine($"[Task {i}] Proceeded.");
})
).ToArray();
// Simulate a delay before unblocking the state
await Task.Delay(1000);
// Unblock the state (releases all waiting tasks)
await state.SetLocalStateUnblockedAsync();
// Wait for all tasks to complete
await Task.WhenAll(tasks);
// Indicate completion
Console.WriteLine("Done.");
Controller-based (Increment / Decrement) local-only sample
Use this when you prefer reference-counted control using a controller abstraction (IncrementAsync / DecrementAsync), ideal for more complex coordination.
using MichelOliveira.Com.ReactiveLock.Core;
using System;
using System.Linq;
using System.Threading.Tasks;
var state = new ReactiveLockTrackerState();
var store = new InMemoryReactiveLockTrackerStore(state);
var controller = new ReactiveLockTrackerController(store);
// Initially block the state by incrementing (e.g. lock acquired)
await controller.IncrementAsync(); // Blocked
var tasks = Enumerable.Range(1, 3).Select(i =>
Task.Run(async () =>
{
Console.WriteLine($"[Task {i}] Waiting...");
await state.WaitIfBlockedAsync(); // Wait while blocked
Console.WriteLine($"[Task {i}] Proceeded.");
})
).ToArray();
// Simulate some delay before unblocking
await Task.Delay(1000);
// Decrement to unblock (lock released)
await controller.DecrementAsync(); // Unblocked
await Task.WhenAll(tasks);
Console.WriteLine("Done.");
Expected Output (both examples)
[Task 3] Waiting...
[Task 1] Waiting...
[Task 2] Waiting...
[Task 3] Proceeded.
[Task 2] Proceeded.
[Task 1] Proceeded.
Distributed HTTP Client Request Counter (Redis)
Setup for Redis
builder.Services.InitializeDistributedRedisReactiveLock(Dns.GetHostName());
builder.Services.AddDistributedRedisReactiveLock("http");
builder.Services.AddTransient();
builder.Services.AddHttpClient("http", client =>
client.BaseAddress = new Uri(builder.Configuration.GetConnectionString("http")!))
.AddHttpMessageHandler();
var app = builder.Build();
await app.UseDistributedRedisReactiveLockAsync();
CountingHandler (Redis and / or Grpc)
public class CountingHandler : DelegatingHandler
{
private readonly IReactiveLockTrackerController _controller;
public CountingHandler(IReactiveLockTrackerFactory factory)
{
_controller = factory.GetTrackerController("http");
}
protected override async Task SendAsync(
HttpRequestMessage request, CancellationToken cancellationToken)
{
await _controller.IncrementAsync();
try
{
return await base.SendAsync(request, cancellationToken);
}
finally
{
await _controller.DecrementAsync();
}
}
}
Expected Behavior
- Each HTTP request increments the "http" lock counter.
- On response, the counter is decremented.
- Lock state is shared across all application instances.
- You can use the lock state to:
- Check if any requests are active.
- Wait for all requests to complete.
Use Case Example (Redis and / or Grpc)
var state = factory.GetTrackerState("http");
if (await state.IsBlockedAsync())
{
Console.WriteLine("HTTP requests active.");
}
await state.WaitIfBlockedAsync();
Console.WriteLine("No active HTTP requests.");
Thread Safety and Lock Integrity
All calls to ReactiveLockTrackerState
and ReactiveLockTrackerController
are thread-safe.
However, you are responsible for maintaining lock integrity across your application logic. This means:
- If you call
IncrementAsync()
/DecrementAsync()
(orSetLocalStateBlockedAsync()
/SetLocalStateUnblockedAsync()
) out of order, prematurely, or inconsistently, it may result in an inaccurate lock state. - In distributed scenarios, this inconsistency will propagate to all other instances, leading to incorrect coordination behavior across your application cluster.
To maintain proper lock semantics:
- Always match every
IncrementAsync()
with a correspondingDecrementAsync()
. - Do not bypass controller logic if using
TrackerController
; useSetLocalStateBlockedAsync()
/SetLocalStateUnblockedAsync()
only for direct state control when you fully understand its implications. - Treat lock transitions as critical sections in your own logic and enforce deterministic, exception-safe usage patterns (e.g.
try/finally
blocks).
ReactiveLock provides safety mechanisms, but you must ensure correctness of your lock protocol.
gRPC Usage Example
This example demonstrates setting up a .NET 9 WebApplication with gRPC-based ReactiveLock and registering trackers for distributed coordination in memory.
Note: To use this example, you must have a running gRPC backend that the ReactiveLock clients can connect to. Without a backend, the trackers will not synchronize across instances.
The backend can also store lock state in another persistent location, such as a database, to maintain state beyond in-memory coordination.
Multiple backends can be configured for replication, allowing lock state to be synchronized across more than one backend for redundancy and high availability.
Setup for Grpc
using MichelOliveira.Com.ReactiveLock.Core;
using MichelOliveira.Com.ReactiveLock.DependencyInjection;
using MichelOliveira.Com.ReactiveLock.Distributed.Grpc;
var grpcReady = false;
var builder = WebApplication.CreateSlimBuilder(args);
// Configure Kestrel for HTTP/1 and HTTP/2
builder.WebHost.ConfigureKestrel(options =>
{
options.ListenAnyIP(8081, listenOptions =>
listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2);
options.ListenAnyIP(8080, listenOptions =>
listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1);
});
// Initialize distributed gRPC ReactiveLock with main and / or replica servers
builder.Services.InitializeDistributedGrpcReactiveLock(
instanceName: Dns.GetHostName(),
mainGrpcServer: builder.Configuration["rpc_local_server"],
replicaGrpcServers: builder.Configuration["rpc_replica_server"]
);
// Register distributed trackers
builder.Services.AddDistributedGrpcReactiveLock("http");
// Register gRPC services
builder.Services.AddGrpc();
builder.Services.AddSingleton();
var app = builder.Build();
app.Use(async (context, next) =>
{
if (context.Connection.LocalPort == 8080)
{
if (!grpcReady)
{
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
return;
}
}
await next();
});
// Map gRPC services
app.MapGrpcService();
// Wait until distributed ReactiveLock is ready before serving requests
_ = Task.Run(async () =>
{
await app.UseDistributedGrpcReactiveLockAsync();
grpcReady = true;
});
app.Run();
ReactiveLockGrpcService
using System.Collections.Concurrent;
using Grpc.Core;
using Google.Protobuf.WellKnownTypes;
using ReactiveLock.Distributed.Grpc;
public class ReactiveLockGrpcService : ReactiveLockGrpc.ReactiveLockGrpcBase
{
private ConcurrentDictionary Groups { get; } = [];
public override async Task SetStatus(LockStatusRequest request, ServerCallContext context)
{
var group = Groups.GetOrAdd(request.LockKey, _ => new LockGroup());
group.InstanceStates[request.InstanceId] =
new InstanceLockStatus()
{
IsBusy = request.IsBusy,
LockData = request.LockData
};
await BroadcastAsync(request.LockKey, group);
return new Empty();
}
public override async Task SubscribeLockStatus(IAsyncStreamReader requestStream,
IServerStreamWriter responseStream,
ServerCallContext context)
{
await foreach (var req in requestStream.ReadAllAsync(context.CancellationToken).ConfigureAwait(false))
{
var group = Groups.GetOrAdd(req.LockKey, _ => new LockGroup());
group.Subscribers.Add(responseStream);
await responseStream.WriteAsync(new LockStatusNotification
{
LockKey = req.LockKey,
InstancesStatus = { group.InstanceStates }
}).ConfigureAwait(false);
break;
}
await Task.Delay(Timeout.Infinite, context.CancellationToken).ConfigureAwait(false);
}
private async Task BroadcastAsync(string lockKey, LockGroup group)
{
var notification = new LockStatusNotification
{
LockKey = lockKey,
InstancesStatus = { group.InstanceStates }
};
foreach (var subscriber in group.Subscribers.ToArray())
{
try
{
await subscriber.WriteAsync(notification).ConfigureAwait(false);
}
catch
{
group.Subscribers.TryTake(out _);
}
}
}
}
Key Points:
InitializeDistributedGrpcReactiveLock
sets up the ReactiveLock client/server connections.- Each tracker (
AddDistributedGrpcReactiveLock
) represents a lockable resource or counter. UseDistributedGrpcReactiveLockAsync
starts background synchronization with other instances.ReactiveLockGrpcService
handles the gRPC messages for distributed coordination.
This approach ensures multiple app instances coordinate lock states in real-time using gRPC streams. Note: The same
CountingHandler
shown in the previous example can be reused here.
Requirements
- .NET 9 SDK
License
MIT © Michel Oliveira
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net9.0
- Microsoft.AspNetCore.Http.Abstractions (>= 2.3.0)
- StackExchange.Redis (>= 2.8.58)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last Updated |
---|---|---|
0.0.23-beta.1 | 0 | 8/22/2025 |
0.0.22 | 58 | 8/19/2025 |
0.0.22-beta.13 | 8 | 8/19/2025 |
0.0.22-beta.10 | 9 | 8/19/2025 |
0.0.22-beta.9 | 7 | 8/19/2025 |
0.0.22-beta.8 | 9 | 8/19/2025 |
0.0.22-beta.7 | 41 | 8/16/2025 |
0.0.22-beta.6 | 96 | 8/13/2025 |
0.0.22-beta.5 | 94 | 8/11/2025 |
0.0.22-beta.4 | 101 | 8/10/2025 |
0.0.22-beta.3 | 91 | 8/10/2025 |
0.0.22-beta.2 | 76 | 8/9/2025 |
0.0.22-beta.1 | 73 | 8/9/2025 |
0.0.21 | 1,252 | 7/30/2025 |
0.0.20 | 95 | 7/30/2025 |
0.0.19 | 115 | 7/29/2025 |
0.0.17 | 94 | 7/29/2025 |
0.0.16 | 87 | 7/29/2025 |
0.0.15 | 174 | 7/26/2025 |
0.0.13 | 140 | 7/26/2025 |
0.0.12 | 205 | 7/26/2025 |
0.0.11 | 276 | 7/25/2025 |
0.0.9 | 281 | 7/25/2025 |
0.0.8 | 297 | 7/25/2025 |
0.0.7 | 300 | 7/25/2025 |
0.0.6 | 306 | 7/25/2025 |
0.0.5 | 324 | 7/25/2025 |
0.0.4 | 324 | 7/25/2025 |
0.0.3 | 321 | 7/25/2025 |
0.0.2 | 330 | 7/25/2025 |
0.0.1 | 328 | 7/25/2025 |