Frank.PulseFlow
2.0.0
See the version list below for details.
dotnet add package Frank.PulseFlow --version 2.0.0
NuGet\Install-Package Frank.PulseFlow -Version 2.0.0
<PackageReference Include="Frank.PulseFlow" Version="2.0.0" />
paket add Frank.PulseFlow --version 2.0.0
#r "nuget: Frank.PulseFlow, 2.0.0"
// Install Frank.PulseFlow as a Cake Addin #addin nuget:?package=Frank.PulseFlow&version=2.0.0 // Install Frank.PulseFlow as a Cake Tool #tool nuget:?package=Frank.PulseFlow&version=2.0.0
PulseFlow (Local Messaging With Channels)
PulseFlow Local Messaging is a lightweight, high-performance messaging system that enables seamless communication, and thread-safe data transfer between different parts of an application. It's designed to be simple, flexible, and scalable, allowing for easy integration into any system architecture.
Table of Contents
Acknowledgements
This is a very thin abstraction layer on top of System.Threading.Channels, which is a NuGet package that gets shipped along with every release of .NET.
The reason for this abstraction layer is to make it easier to use System.Threading.Channels
in a Dependency Injection scenario, and to make it easier to use System.Threading.Channels
in a thread-safe manner.
Thanks to the .NET team for making System.Threading.Channels
, and in particular Stephen Toub for his blog post on System.Threading.Channels
, and as part of the rest of the team.
Overview
PulseFlow Local Messaging is a lightweight, high-performance messaging system that enables seamless communication between different parts of an application. It's designed to be simple, flexible, and scalable, allowing for easy integration into any system architecture.
This library does have a dependency on Frank.Channels.DependencyInjection
, which is a simple registration of System. Threading.Channels.Channel<T>
in the Dependency Injection container. This is done to make it easier to use System. Threading.Channels
in a Dependency Injection scenario, and to make it easier to use System.Threading.Channels
in a
thread-safe manner. This includes a ChannelWriter<T>
and a ChannelReader<T>
that are a thread-safe singlteton, and can
be called directly from the Dependency Injection container without having to call Channel<T>
, and then Channel<T>.Writer
or
Channel<T>.Reader
directly. This is done to make it easier to use System.Threading.Channels
in a Dependency Injection
behind the scenes in PulseFlow. This will also make it possible to "intercept" the ChannelWriter<T>
and ChannelReader<T>
for debugging, logging, or other purposes.
Key Features
- Lightweight: PulseFlow is a lightweight messaging system, with a small footprint and minimal resource requirements.
- High Performance: It's designed for high performance and scalability, capable of handling a vast volume of messages simultaneously.
- Asynchronous Communication: PulseFlow supports asynchronous data flow, allowing for non-blocking and concurrent message transmissions.
- Flexible and Adaptable: It's flexible and adaptable, capable of handling various types of messages and adapting its processing logic based on the nature and requirements of each message.
- Simple and Easy to Use: PulseFlow is simple and easy to use, with a straightforward API and minimal configuration requirements. This is assuming Dependency Injection is used. If not, then it's a bit more complicated, and not supported at this time.
- Thread-Safe: It's thread-safe, ensuring that messages are processed in a safe and reliable manner.
- Open Source: PulseFlow is open-source, with the source code available on GitHub.
- Cross-Platform: It's cross-platform, supporting multiple operating systems and platforms.
- Supports multiple consumers: PulseFlow supports multiple consumers, thereby allowing for parallel processing of messages. This is particularly useful in scenarios where there's a need to save messages to some form of audit trail or log.
Illustration
graph TB
subgraph "Transmission"
EmailPulse[Email : IPulse] -->|transmitted via| Conduit[IConduit]
FtpPulse[FTP : IPulse] -->|transmitted via| Conduit[IConduit]
ApiPulse[API : IPulse] -->|transmitted via| Conduit[IConduit]
end
subgraph "Delivery"
Conduit -->|delivered to| Channel[Channel<IPulse<T>>]
end
subgraph "Consumption and Routing"
Channel -->|consumed and routed by| Nexus[Nexus]
Nexus -->|typeof==Email| EmailFlow[EmailFlow : IFlow]
Nexus -->|typeof==API| Flow[FtpAndApiFlow : IFlow]
Nexus -->|typeof==FTP| Flow[FtpAndApiFlow : IFlow]
Nexus -->|typeof==?| AuditFlow[AuditFlow : IFlow]
AuditFlow -->|audit trail by| ILogger[ILogger]
end
In this Mermaid diagram:
- IPulse is the interface for the Pulse.
- IConduit is the interface for the Conduit, which is the pathway through which messages are transmitted.
- Nexus is the central processing service, which handles the pulse messages and routes them to their respective destinations.
- IFlow is the interface for the a flow, which is the mechanism that handles/consumes the pulse messages.
- ILogger is the interface for the generic logger in dotnet.
When you include this in a GitHub Markdown file, GitHub will render the Mermaid diagram as a visual graph. Remember to remove the extra backticks (```) in the beginning and end when adding this to your README.
Use Cases
PulseFlow is a general-purpose messaging system that can be used in a wide variety of applications. It's particularly useful in scenarios where there's a need for asynchronous communication between different parts of the system.
Example 1 - Web API
A use case is when you need multiple threads to do some non-multiple-thread-safe work, like saving to a log-file. Example: You have a web API and you want to save a all requests' route, method and user to a log-file. You can use PulseFlow to do this in a thread-safe manner, because PulseFlow's IConduit
and IChannel
are thread-safe. You can have multiple threads saving to the same log-file, and PulseFlow will ensure that the log-file is not written to by multiple threads at the same time, and that the writing happens in the order the messages are received.
graph TB
subgraph Transmission
ApiPulse1[ApiMessage : IPulse] -->|transmitted via| Conduit[IConduit]
ApiPulse2[ApiMessage : IPulse] -->|transmitted via| Conduit[IConduit]
ApiPulse3[ApiMessage : IPulse] -->|transmitted via| Conduit[IConduit]
ApiPulse4[ApiMessage : IPulse] -->|transmitted via| Conduit[IConduit]
ApiPulse5[ApiMessage : IPulse] -->|transmitted via| Conduit[IConduit]
ApiPulse6[ApiMessage : IPulse] -->|transmitted via| Conduit[IConduit]
end
subgraph Delivery
Conduit -->|delivered to| IChannel[Channel]
end
subgraph Consumption and Routing
IChannel -->|consumed and routed by| Nexus[Nexus]
Nexus -->|typeof==API| Flow[FileLoggerFlow : IFlow]
end
Example 2 - Priority Queue
Another use case is when you need to prioritize messages. You might have some user-input that needs to be processed, and you want to prioritize some messages over others. You can use PulseFlow to do this,whithout having to worry about thread-safety. You can have multiple threads processing the messages, and PulseFlow will ensure that the messages are processed in the order they are received. And so if you have a different type for each priority, you can have multiple threads processing each priority, and so when you have 10-15 "normal" messages, your single "premium" message will get processed at in its own thread.
graph LR
PrioritizedWork[StandardMessage : IPulse] --> Conduit[IConduit]
PrioritizedWork[StandardMessage : IPulse] -->|standard| Conduit[IConduit]
PrioritizedWork[StandardMessage : IPulse] --> Conduit[IConduit]
PrioritizedWork[PremiumMessage : IPulse] -->|premium| Conduit[IConduit]
Conduit -->|delivered to| IChannel[Channel]
IChannel --> Nexus[Nexus]
subgraph parallel processing
Nexus --> StandardFlow[StandardFlow : IFlow]
Nexus -->|typeof==StandardMessage| StandardFlow[StandardFlow : IFlow]
Nexus --> StandardFlow[StandardFlow : IFlow]
Nexus -->|typeof==PremiumMessage| PremiumFlow[PremiumFlow : IFlow]
end
StandardFlow --> SomeResource[SomeResource]
PremiumFlow --> SomeResource[SomeResource]
Example 3 - Complex Data Categorization
If you have layered data, and you want to categorize the data based on some of the layers, you can use PulseFlow to do this. Suppesed you have a data ingest system and all you get is a byte-array, and what you want is to parse the bytes to data that could represent an invoice. This can be done in "classical" logic structures, but this can mean large if/else or switch statements, and this can be hard to maintain or change. With PulseFlow you can get a more modular structure, where each step is a separate
class, and you can easily add new steps, or change the order of the steps, or even add parallel steps. And you can do this without having to worry about thread-safety unless you want to through access to some shared resource instead of using the IFlow
s to pass data between the steps, e.g. if you write to a log-file from different steps. (This is not recommended, but it is possible. Correct way is to use IFlow
s to pass data between steps, and then have a separate IFlow
that writes to the log-file.)
graph TB
File[Bytes : IPulse] --> |PulseFlow| X1[IdentifierFlow]
X1 -.->|.xml| XmlFlow[XmlFlow]
X1 -.->|.json| JsonFlow[JsonFlow]
X1 -.->|.csv| CsvFlow[CsvFlow]
XmlFlow -->|PulseFlow| XDocumentFlow
XDocumentFlow -->|PulseFlow| XDocumentIdentifer
XDocumentIdentifer -->|PEPPOL-PulseFlow| PeppolFlow -->|UBL-PulseFlow| UblFlow
XDocumentIdentifer -->|UBL-PulseFlow| UblFlow
UblFlow -->|PulseFlow| UblIdentifer
UblIdentifer -->|PulseFlow| UblInvoiceFlow -->|PulseFlow| InvoiceFlow
UblIdentifer -->|PulseFlow| UblCreditNoteFlow
UblIdentifer -->|PulseFlow| UblDebitNoteFlow -->|PulseFlow| InvoiceFlow
UblIdentifer -->|PulseFlow| UblReminderFlow -->|PulseFlow| InvoiceFlow
JsonFlow -->|PulseFlow| JsonIdentifer
JsonIdentifer -->|PulseFlow| JsonInvoiceFlow
JsonInvoiceFlow -->|PulseFlow| InvoiceFlow
InvoiceFlow -->|PulseFlow| InvoicePaymentFlow
subgraph Pulse_Flow
subgraph Sender
PulseA -.->|Send| IConduit[IConduit]
PulseB -.->|Send| IConduit[IConduit]
PulseC -.->|Send| IConduit[IConduit]
end
IConduit --> IChannel[Channel]
IChannel --> Nexus[Nexus]
Nexus --> FlowX[IFlow]
Nexus --> FlowY[IFlow]
end
Getting Started
This section provides a quick guide on how to get started with PulseFlow Local Messaging.
Installation
PulseFlow is available as a NuGet package, which can be installed using the following command:
dotnet add package Frank.PulseFlow
Once installed, you can start using PulseFlow by adding the following using statement to your code:
using Frank.PulseFlow;
Basic Usage
The following code snippet shows a basic example of how to use PulseFlow:
public class Program
{
public static async Task Main(string[] args)
{
IHostBuilder builder = Host.CreateDefaultBuilder();
builder.ConfigureServices((context, services) =>
{
services.AddPulseFlow(messagingBuilder =>
{
messagingBuilder.AddFlow<TextPulseFlow>();
});
services.AddHostedService<TestingService>();
});
IHost app = builder.Build();
await app.RunAsync();
}
}
public class TextFlow : IFlow
{
private readonly ILogger<TextFlow> _logger;
public TextFlow(ILogger<TextFlow> logger) => _logger = logger;
public async Task HandleAsync(IPulse message, CancellationToken cancellationToken)
{
if (message is TextPulse textMessage)
_logger.LogInformation("Received text message: {Text}", textMessage.Text);
await Task.CompletedTask;
}
public bool CanHandle(Type pulseType) => pulseType == typeof(TextPulse);
}
public class TestingService : BackgroundService
{
private readonly IConduit _messenger;
public TestingService(IConduit messenger) => _messenger = messenger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Delay(2000, stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
await _messenger.SendAsync(new TextPulse { Id = Guid.NewGuid(), Text = "Hello World" });
await Task.Delay(1000, stoppingToken);
}
}
}
public class TextPulse : BasePulse
{
public string Text { get; set; }
}
Concepts
This section provides an in-depth explanation of the key concepts and components within the system: Nexus, Conduit, Pulse, and PulseFlow. Understanding these concepts is crucial for grasping how the system operates and interacts with data.
Nexus
The Nexus is the central hub of our messaging system, analogous to a neural network's core. It serves as the primary processing service, where all data messages, or 'Pulses', are received, interpreted, and routed to their respective destinations.
- Role: Nexus acts as the orchestrator within the system, managing the flow of messages and ensuring that each one is processed according to predefined rules and logic.
- Functionality: It handles various tasks like message validation, transformation, and decision-making on how and where messages should be directed post-processing.
- Scalability and Performance: Designed for high performance and scalability, Nexus can handle a vast volume of messages simultaneously, ensuring minimal latency and high throughput in data processing.
Conduit
The Conduit represents the pathway through which messages, or 'Pulses', are transmitted within the system. It's the messenger that ensures the delivery of data from one point to another.
- Mechanism: Conduit facilitates the smooth and efficient transport of messages across different parts of the system.
- Reliability and Integrity: Ensuring data integrity, Conduit maintains the fidelity of the messages as they traverse through various processes.
- Asynchronous Communication: It supports asynchronous data flow, allowing for non-blocking and concurrent message transmissions, which is key for a responsive and efficient system.
Pulse
Pulse is the term used to describe the individual units of data or messages that flow through the system.
- Data Encapsulation: Each Pulse is a packet of information, encapsulating the necessary data in a well-defined format.
- Types and Variability: Pulses can vary in type and structure, ranging from simple text messages to complex data structures, each tailored to carry specific information relevant to its intended process.
- Lifecycle: The lifecycle of a Pulse includes its creation, transmission through the Conduit, processing in the Nexus, and final delivery or action as dictated by the system's logic.
Flow
Flow is the sophisticated mechanism responsible for handling and manipulating the Pulses as they move through the system.
- Message Handling: It's specifically designed to process each Pulse, applying necessary transformations, routing, and any other required operations.
- Flexibility and Adaptability: PulseFlow is adept at handling various types of Pulses, capable of adapting its processing logic based on the nature and requirements of each message.
- Integration Point: Acting as a key integration point within the system, it ensures that Pulses are managed efficiently and effectively, readying them for their next phase in the data journey.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Contributing
Contributions are welcome! Please see CONTRIBUTING.md for more details.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. |
-
net8.0
- Frank.Channels.DependencyInjection (>= 1.1.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 8.0.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Frank.PulseFlow:
Package | Downloads |
---|---|
Frank.PulseFlow.Logging
This is a ILogger{T} compatible "sink" for logs that can be "hooked into", to allow to listen to logging however you wish. PulseFlow uses Channel -mechanism for internal messaging |
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
2.0.8-preview | 110 | 3/3/2024 |
2.0.0 | 385 | 2/4/2024 |
1.7.7-preview | 92 | 2/4/2024 |
1.7.0 | 521 | 1/21/2024 |
1.6.5-preview | 107 | 1/21/2024 |
1.6.4-preview | 112 | 1/21/2024 |
1.6.3-preview | 105 | 1/19/2024 |
1.6.0 | 274 | 1/14/2024 |
1.5.0 | 556 | 12/31/2023 |
1.4.1-preview | 119 | 12/31/2023 |
1.4.0 | 143 | 12/19/2023 |
1.3.46-preview | 118 | 12/19/2023 |
1.3.44-preview | 110 | 12/15/2023 |
1.3.0 | 564 | 12/15/2023 |
1.2.36-preview | 119 | 12/14/2023 |
1.2.1.41-preview | 117 | 12/15/2023 |
1.2.1.39-preview | 115 | 12/15/2023 |
1.2.1 | 142 | 12/14/2023 |
1.1.0 | 137 | 12/14/2023 |
1.0.28-preview | 108 | 12/14/2023 |
1.0.15-preview | 132 | 12/14/2023 |
1.0.10-preview | 126 | 12/14/2023 |
1.0.8-preview | 125 | 12/14/2023 |
1.0.0 | 135 | 12/14/2023 |