This article presents a C# implementation of the publisher-subscriber pattern. The main difference from similar implementations is the possibility to transfer messages between processes. I'll describe the main features and give code samples on how to use the library.
Introduction
In my current working project, we started to experience issues with the number of relationships between software modules and decided to use the publisher-subscriber pattern to transfer events and commands and reduce coupling. As is often the case, none of the existing implementations suited us and I began to develop my own library.
The implementation of the pattern turned out to be not a very difficult task. The first version was used in our project and has proved itself very well. But since our application consists of several processes and services, we immediately wanted to link them into a single whole according to the same publisher-subscriber principle. So, it was made a decision to extend the library for interprocess communications also.
During the development, the following requirements were imposed:
- Ease of use - everything should work the same as for the local PubSub module and subscribers, in principle, may not even know where the messages came from - from their own process or remotely.
- Delivery must be guaranteed - if the message cannot be transmitted, the publisher immediately receives an exception.
- All exceptions that occurred during the processing of a message must also be serialized and transmitted back to the sender.
- Messages must be processed independently of each other and are not queued.
- The asynchronous calling model (async/await) must be supported.
- Server and client hubs act as a single unit - regardless of which process the publisher or subscriber is in.
- Connection between processes should be restored automatically.
What is PubSub?
First of all, it is worth understanding what the publisher-subscriber pattern is and how to use it. As Wikipedia says:
In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any, there may be. Similarly, subscribers express interest in one or more classes and only receive messages that are of interest, without knowledge of which publishers, if any, there are.
In my module, called MetaPubSub
, the most common basic and some advanced features were implemented:
- Bi-directional communication
- Near real-time interaction with consumers
- The independent order of the
Subscribe
and Publish
methods - Awaitable methods, for example, you can
await Publish
and wait until all subscribers have finished processing the message - At least once delivery check - you can opt in to have an exception if no one subscribed to your message
- Message filtering - you can define a predicate to subscribe only those messages you want to process
- Timeout to wait for a subscriber - your message can be queued and wait until someone subscribed and processed it
- Scheduling a message - your message can be queued and published after a time delay
- Asynchronous waiting for a specified message by a single method call, without the need to Subscribe/Unsubscribe to this message (method
When
) - Request-response pattern - send a message and wait for the response as a single awaitable method, without need to Subscribe/Unsubscribe to the response message (method
Process
) - Cancellation token support - you can cancel scheduling or waiting for the message
- Exceptions handling - all exceptions raised when a message processing by subscribers can be caught by the publisher as an
AggregateException
How-to Use Examples
Preparing a Message Class
Each message class must be derived from the IPubSubMessage
interface:
public interface IPubSubMessage
{
bool DeliverAtLeastOnce { get; }
int Timeout { get; }
string RemoteConnectionId { get; set; }
}
Also, you can derive it from PubSubMessageBase
, which declaration is as follows:
public class PubSubMessageBase : IPubSubMessage
{
public bool DeliverAtLeastOnce { get; set; }
public int Timeout { get; set; }
public string RemoteConnectionId { get; set; }
}
Or you can define your own base class:
public class MessageBase : IPubSubMessage
{
public bool DeliverAtLeastOnce => true;
public int Timeout => 1000;
public string RemoteConnectionId { get; set; }
}
So, your message declaration should look like this:
public class MyMessage : MessageBase
{
public string SomeData { get; }
public MyMessage(string data)
{
SomeData = data;
}
}
Hub Creation
var hub = new MetaPubSub();
hub.Subscribe<MyMessage>(OnMyMessage);
await hub.Publish(new MyMessage());
hub.Unsubscribe<MyMessage>(OnMyMessage);
Exceptions Handling
var hub = new MetaPubSub();
try
{
var message = new MyMessage
{
DeliverAtLeastOnce = true,
};
hub.Subscribe<MyMessage>(OnMyMessageHandlerWithException);
await hub.Publish(message);
}
catch (NoSubscribersException ex)
{
Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}
catch (TimeoutException ex)
{
Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}
catch (AggregateException ex)
{
Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
foreach (var innerEx in ex.InnerExceptions)
{
Console.WriteLine($"\tInner Exception {innerEx.GetType()}: {innerEx.Message}");
}
}
await hub.Unsubscribe<MyMessage>(OnMyMessageHandlerWithException);
At Least Once Delivery Check
var hub = new MetaPubSub();
var message = new MyMessage
{
DeliverAtLeastOnce = true
};
try
{
await hub.Publish(message);
}
catch (NoSubscribersException ex)
{
Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}
hub.Subscribe<MyMessage>(OnMyMessage);
await hub.Publish(message);
hub.Unsubscribe<MyMessage>(OnMyMessage);
You can see the full list of examples on the project GitHub page.
Adding the Interprocess Communication Layer
For our hub to start working as a means of transferring messages between processes, you need to connect them through some kind of data transfer mechanism. It can be Sockets, COM, Shared File, DDE, RPC, or any other way. But I still settled on named-pipes - it is quite simple, convenient and its performance and reliability were fine with me.
So, one hub should become a server, and all the others - clients. To do this, on the server-side, you need to call the StartServer
method:
var serverHub = new MetaPubSub();
serverHub.StartServer("Meta");
On the client-side, you need to connect to the server:
var clientHub = new MetaPubSub();
await clientHub.ConnectToServer("Meta");
Now, if the client wants to receive an event from the server, he must subscribe to it:
await clientHub.SubscribeOnServer<MyMessage>(Handler);
and if the client wants to send an event to the server, he does it this way:
await clientHub.PublishOnServer(new MyMessage());
If the server wants to publish a message, then it does so in the same way as for a local message:
await serverHub.Publish(new MyMessage());
and if the server wants to receive messages, then it subscribes like for a local message:
serverHub.Subscribe<MyMessage>(Handler);
In this case, the Subscribe
call is not awaited because of no time-consuming operations such as network transmission or message processing.
How It Works?
Each message gets a GUID, serialized to JSON, and passed through a named-pipe. To wait for the result, a TaskCompletionSource
created and awaited by the calling code. The receiving party deserializes the message and sends it to the subscribers for processing. After processing, a result is generated that contains the same GUID, all exceptions if any, and is sent back to the sender's side. The result is deserialized again and passed to the TaskCompletionSource
. As a result, the call ends or an exception is thrown.
Send and receive via named-pipe occurs independently of each other on different threads. Message responses may come in any order, regardless of the order in which the requests were sent. All MetaPubSub
methods are thread-safe.
Connection Handling
The client re-establishes a lost connection with the server automatically. But what if the connection to the server is not yet established when the client starts? The client will get an exception when trying to call the SubscribeOnServer
method. Next, we can implement the logic of waiting for a connection and subscribe for all the necessary events. But this must be done for each class that subscribes to events on the server, which leads to source code growth and duplication. In this case, the TryConnect
and TrySubscribeOnServer
methods come to the rescue.
TryConnect
- connection to the server will be established as soon as it is available. It returns true
if the connection has been established immediately, otherwise returns false
. TrySubscribeOnServer
- Subscribes to a message locally and then trying to subscribe on the server. If the server is not connected, returns false
. Subscription will be made automatically after the server connection.
Using these methods, you no longer need to worry about whether there is a connection to the server at the time of the method call.
Restrictions on Use
The publisher-subscriber pattern is a very convenient and useful architectural pattern, but it has its own narrow scope. For example, here are a few cases where you might be better off using the Message Queue pattern:
- When the order in which messages are processed is important
- If every message must be guaranteed to be processed, even if communication between the client and the server has been interrupted
- When you need to parallelize the flow of messages between several handlers
- When you need to persistence all unprocessed messages to disk in case of the server shutdown
- When message prioritization (QoS) is needed
Conclusion
The MetaPubSub
module has been tested in my working project, and although I cannot guarantee that it is 100% bug-free, I believe that it can already be used in production code. But it should be borne in mind that the authorization system has not yet been implemented when connecting a client to a server. In development, I tried to pay special attention to the speed of execution of requests and make all the code lock-free. It would be nice to conduct tests and compare the performance with other similar systems, perhaps someone from the users will be able to do this and share the results.
You can view the source code, examples, leave an issue, or submit a pull request in the GitHub repository.
History
- 15th October, 2020: Initial version