NTDLS.CatMQ.Shared
1.0.4
See the version list below for details.
dotnet add package NTDLS.CatMQ.Shared --version 1.0.4
NuGet\Install-Package NTDLS.CatMQ.Shared -Version 1.0.4
<PackageReference Include="NTDLS.CatMQ.Shared" Version="1.0.4" />
paket add NTDLS.CatMQ.Shared --version 1.0.4
#r "nuget: NTDLS.CatMQ.Shared, 1.0.4"
// Install NTDLS.CatMQ.Shared as a Cake Addin #addin nuget:?package=NTDLS.CatMQ.Shared&version=1.0.4 // Install NTDLS.CatMQ.Shared as a Cake Tool #tool nuget:?package=NTDLS.CatMQ.Shared&version=1.0.4
CatMQ
CatMQ is a high-performance and reliable persistent message queue designed for efficient inter-process communication, task queuing, load balancing, and data buffering over TCP/IP.
Another Message Queue?! Why?
CatMQ is not “fully featured”, as in it does not natively support clustering, it is not multi-protocol (no AMQP nor MQTT), and it is not friendly to non-dot-net subscribers. Ok, then what’s the deal? Well, we needed a MQ that was slim, straight forward and free of fat-runtimes such as java or additional dependencies such as erlang. We went on an internet fishing expedition and came up empty.
So, we built one. Welcome to CatMQ: a reliable yet slim message queue.
Packages 📦
- Server Nuget package: https://www.nuget.org/packages/NTDLS.CatMQ.Server
- Client Nuget package: https://www.nuget.org/packages/NTDLS.CatMQ.Client
- Dedicated server install and web UI: https://github.com/NTDLS/CatMQ/releases
Server
Running the server is as simple as downloading and installing the dedicated CatMQ Service, which is a platform independent service that includes a web management interface.
Alternatively, the server can be run in-process using the nuget package NTDLS.CatMQ.Server. Running the server in-process is simple and configurable. The server process does not have to be dedicated as it can also be one of the processes that is involved in inner-process-communication.
internal class Program
{
static void Main()
{
var server = new CMqServer();
//Listen for queue clients on port 45784
server.StartAsync(45784);
Console.WriteLine("Press [enter] to shutdown.");
Console.ReadLine();
server.Stop();
}
}
Client
With the client, we can interact with the server. Create/delete/purge queues, subscribe and of course send and receive messages. Messages are sent by simply passing a serializable class instance that inherits ICMqMessage.
internal class MyMessage(string text) : ICMqMessage
{
public string Text { get; set; } = text;
}
static void Main()
{
var client = new CMqClient(); //Create an instance of the client.
client.Connect("127.0.0.1", 45784); //Connect to the queue server.
client.OnReceived += Client_OnReceived; //Wire up an event to listen for messages.
//Create a queue. These are highly configurable.
client.CreateQueue(new CMqQueueConfiguration("MyFirstQueue")
{
Persistence = PMqPersistence.Ephemeral
});
//Subscribe to the queue we just created.
client.Subscribe("MyFirstQueue", OnMessageReceived);
//Enqueue a few messages, note that the message is just a class and it must inherit from ICMqMessage.
for (int i = 0; i < 10; i++)
{
client.Enqueue("MyFirstQueue", new MyMessage($"Test message {i++:n0}"));
}
Console.WriteLine("Press [enter] to shutdown.");
Console.ReadLine();
//Cleanup.
client.Disconnect();
}
private static bool OnMessageReceived(CMqClient client, CMqReceivedMessage rawMessage)
{
var message = rawMessage.Deserialize();
//Here we receive the messages for the queue(s) we are subscribed to
// and we can use pattern matching to determine what message was received.
if (message is MyMessage myMessage)
{
Console.WriteLine($"Received: '{myMessage.Text}'");
}
else
{
Console.WriteLine($"Received unknown message type.");
}
return true;
}
Web API
When enabled, CatMQ also allows managing queues by the way of Web API, you'll first need to login to the web management UI, create a user and generate a API key. This API key will need to be passed in the "x-catmq-api-Key" header value.
Currently supported WebAPI calls
- Enqueue/{queueName}/{objectType} [json in body]
- CreateQueue/{queueName}
- CreateQueue [CMqQueueConfiguration json in body]
- Purge/{queueName}
- DeleteQueue/{queueName}
Example creating a queue using default settings with WebAPI via cURL
URL: /api/CreateQueue/{queueName}
curl --location --request POST 'http://127.0.0.1:45783/api/CreateQueue/MyDefault' \
--header 'x-catmq-api-Key: kk4IajpGUJHMR1dFlzXmvnt0VlvGhp'
Example creating a queue using custom settings with WebAPI via cURL
URL: /api/CreateQueue/{queueName}
curl --location 'http://127.0.0.1:45783/api/CreateQueue' \
--header 'x-catmq-api-Key: kk4IajpGUJHMR1dFlzXmvnt0VlvGhp' \
--header 'Content-Type: application/json' \
--data '{
"QueueName": "MyQueue",
"BatchDeliveryInterval": "00:00:00",
"DeliveryThrottle": "00:00:00",
"MaxDeliveryAttempts": 5,
"MaxMessageAge": "01:00:00",
"ConsumptionScheme": "Delivered",
"DeliveryScheme": "Random",
"PersistenceScheme": "Persistent"
}'
Example enqueuing a message with WebAPI via cURL
URL: /api/Enqueue/{queueName}/{assemblyQualifiedTypeName}
curl --location 'http://127.0.0.1:45783/api/Enqueue/MyFirstQueue/Test.QueueClient.Program%2BMyMessage%2C%20Test.QueueClient' \
--header 'x-catmq-api-Key: kk4IajpGUJHMR1dFlzXmvnt0VlvGhp' \
--header 'Content-Type: application/json' \
--data '{
"Text": "This is a test message"
}'
Notes about Assembly Qualified Type names
Messages are automatically deserialized by the QueueClient, so its necessary to provide the fully assembly qualified type name when enqueuing a message.
Assembly Qualified Type Name format:
- Test.QueueClient.MyMessage, Test.QueueClient
- Explanation: {ClassName}, {AssemblyName}
Assembly Qualified Type Name format for nested classes:
- Test.QueueClient.Program+MyMessage, Test.QueueClient
- Explanation: {EnclosingClass}+{Class}, {AssemblyName}
Technologies
CatMQ is based heavily on internally built technologies that leverage the works by people much smarter than me. Eternally grateful to all those for making my development a walk in the park.
- Light-weight thread scheduling with sub-pooling: NTDLS.DelegateThreadPooling.
- Nullabily and formatting: NTDLS.Helpers.
- Based heavily on the standalone in-memory message queue: NTDLS.MemoryQueue.
- Round-trip messaging with compression, encryption, checksum and reliability notifications: NTDLS.ReliableMessaging.
- Stream framing for packets reconstruction and fragmentation: NTDLS.StreamFraming.
- Resource protection and concurrency because threads like to bite: NTDLS.Semaphore
- Polymorphic deserialization provided by Newtonsoft.Json because Microsoft refused to add it for "reasons".
- Mega-tight communication enabled by protobuf-net.
- Message persistence provided by rocksdb-sharp.
- Logging, because otherwise we'd be blind: serilog.
- Windows service magic: Topshelf.
Screenshots
Home view
(yes, that's over 1-billion messages). 👀
Queue view
Messages view
Message view
License
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. |
-
net8.0
- NTDLS.ReliableMessaging (>= 1.11.4)
-
net9.0
- NTDLS.ReliableMessaging (>= 1.11.4)
NuGet packages (2)
Showing the top 2 NuGet packages that depend on NTDLS.CatMQ.Shared:
Package | Downloads |
---|---|
NTDLS.CatMQ.Client
A high-performance and reliable persistent message queue designed for efficient inter-process communication, task queuing, load balancing, and data buffering over TCP/IP. |
|
NTDLS.CatMQ.Server
A high-performance and reliable persistent message queue designed for efficient inter-process communication, task queuing, load balancing, and data buffering over TCP/IP. |
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
2.3.1 | 33 | 1/25/2025 |
2.3.0 | 85 | 1/19/2025 |
2.2.0 | 88 | 1/17/2025 |
2.1.1 | 109 | 1/15/2025 |
2.1.0 | 105 | 1/15/2025 |
2.0.4 | 47 | 1/14/2025 |
2.0.3 | 67 | 1/13/2025 |
2.0.2 | 71 | 1/13/2025 |
2.0.1 | 66 | 1/13/2025 |
2.0.0 | 76 | 1/13/2025 |
1.0.15 | 48 | 1/8/2025 |
1.0.14 | 82 | 1/5/2025 |
1.0.13 | 73 | 1/5/2025 |
1.0.12 | 63 | 1/5/2025 |
1.0.11 | 72 | 1/5/2025 |
1.0.10 | 93 | 1/4/2025 |
1.0.9 | 83 | 1/4/2025 |
1.0.8 | 62 | 1/3/2025 |
1.0.7 | 91 | 1/2/2025 |
1.0.6 | 69 | 1/2/2025 |
1.0.5 | 75 | 1/1/2025 |
1.0.4 | 95 | 1/1/2025 |
1.0.3 | 93 | 1/1/2025 |
1.0.2 | 108 | 12/31/2024 |
1.0.1 | 105 | 12/31/2024 |
Initial release.