Introduction
This is Part 2 in a series of articles on writing Topic based publish/subscribe design pattern implementation in C#. This series shows publish/subscribe design pattern implementation in two approaches with the same sample program along with the general idea of publish/subscribe design pattern implementation. The approaches are using Socket programming and Using WCF.
As we know, whatever technology (Socket/Remoting/WCF) we use to implement publish/subscribe design pattern, the end result will be almost the same. For this reason, at first, each part in this series of articles discusses what are the general ideas to implement publish/subscribe design pattern, then it shows the implementation using a specific technology. Here, publish/subscribe design pattern is implemented using two different approaches for the same sample application so that you can compare each approach implementation with others and can pick one which best fits your requirements. The advantages and disadvantages of each approach are also discussed in each part.
Can You Read Each Part Independently?
Here, you can read each part independently. To make each part independently readable, necessary information is repeated in each part.
Background
I have written an article about a year ago which discusses Non Topic based publish/subscribe design pattern implementation in C#. After that, I got few requests to make a Topic based version of this. I gave them word, I will surely do this when I will get free time. Few days ago, I got some off days and started to change the implementation for Non Topic based to Topic based. But after seeing my code and article after 1 year, it seems to me the code is unnecessary lengthy and article is not enough informative. Then, I started to write this two series article to make my work better. I have written this series of two part articles as a learning exercise, and I expect comments from you about the write up. Please let me know if you have any suggestion.
What is Topic Based Publish Subscribe Design Pattern?
In a Topic based publish-subscribe pattern, sender applications tag each message with the name of a topic, instead of referencing specific receivers. The messaging system then sends the message to all applications that have asked to receive messages on that topic. Message senders need only concern themselves with creating the original message, and can leave the task of servicing receivers to the messaging infrastructure.
In this pattern, the publisher and subscriber can communicate only via messages. The Publish-Subscribe Pattern solves the tight coupling problem. It is a very loosely coupled architecture, in which senders do not even know who their subscribers are.
Figure: Topic-based Publish Subscribe paradigm
There are three types of software applications in the above diagram: Message Publisher, Message Subscriber, and Pub/Sub Server. Here, a Message publisher sends messages to the Message Server without any knowledge of Message subscribers and Message Subscribers will only get the messages of types for which he is registered with the server. E.g., say we have 3 different Topics (message type): a, b, c. but only topic a is of interest to Subscriber 1, topics b and c are of interest to Subscriber 2 and Subscriber 3 want notification of topics a and c. So subscriber 1 will be notified for topic a, Subscriber 2 will be notified for topics b and c and Subscriber 3 will be notified for topics a and c by Server. Here, Message Publisher does not know who is receiving the messages and Message subscriber does not have knowledge about Message publisher.
Basic Idea about Implementation
Requirement 1: Subscriber Applications subscribe in one or more topics and only receive messages that are of interest.
To implement so, we have to keep the record of which subscriber is interested for which topic, then based on these records, we will decide where to relay an event of a particular topic. So we have to maintain the lists of subscribers topic wise. For every topic, there will be a list which contains the address of subscribers who have the interest in that topic. When a publisher application sends an event of a particular topic to pub/sub server, pub/sub server needs to relay this event to subscribers who has interest in that topic. To make this happen, pub/sub server will take the subscriber list of that topic and will send the event to the addresses of this list. To keep the lists of subscriber’s addresses topic wise, dictionary is a good choice. Key of the dictionary represents topic name and value represents subscribers address list.
Requirement 2: Loose coupling between the publisher and the subscriber. The publisher has no knowledge about the subscribers, including how many there are or where they live as well as the subscriber has no knowledge about the publisher, including how many there are or where they live.
To implement so, direct communication between publishers and subscribers cannot be allowed. So we have to place a separate entity among them who will keep the topic wise subscribers address list and receives the event from subscriber and relay the event to subscribers and expose methods for the subscriber to subscribe topic wise. The subscribers and publisher only know this separate entity. This separate entity is called the Publish/Subscribe Service or Server. The functionality of this separate entity is divided between publish service and Subscribe Service. Publish service will receive the event from publisher and relay it to subscribers. Subscriber service will expose Subscribe/Unsubscribe operations for the subscribers. How do we filter out subscribers to relay event of a particular topic? To make filtering out subscribers to relay event of a particular topic, an entity named Filter is implemented. Filter entity is used by both Publish Service and Subscriber Service.
Figure: Implementation diagram
Functionality of Filter Unit:
- Keeps the list of subscriber topic wise
- Returns a list of subscribers for a topic
- Exposes method to add new subscriber
- Exposes method to remove subscriber
Functionality of Publish Service:
- Receives event from the Applications (Publishers) to notify Subscribers
- Sends event of a particular topic to Applications (Subscribers) who has been subscribed for that topic
Functionality of Subscribe Service:
- Exposes method which an application (
Subscriber
) can remotely invoke through communication channel to subscribe topic wise for getting events - Exposes method which an application (
Subscriber
) can remotely invoke through communication channel to Unsubscribe topic wise
Functionality of Subscriber:
- Subscriber application registers itself for one or more topics event
- Receives event when event is sent from publisher
Functionality of Publisher:
- Sends event to publisher Service for publishing
Implementation of pub/sub in WCF
Here, WCF callback is used to notify the subscribers about an event. Every subscriber implements a callback interface. When the subscriber sends request to subscribe to Subscriber Service, its callback reference is kept topic wise so that when a later time an event comes of that topic, server can use that callback reference to notify the corresponding subscriber. When a publisher sends an event of a particular topic by invoking a method of publisher service, Publisher service takes the list of callback references of that topic and notify the subscribers who have interest on that topic by invoking a method named publish of callback references.
Steps of Implementation
Step 1: Making the Filter Class
Filter
class has the following responsibilities and is used by both Publish Service and Subscriber Service.
- Keeps the list of subscriber topic wise and
- Exposes method to add new subscriber
- Exposes method to remove subscriber
- Returns a list of subscribers for a Topic
class Filter
{
static Dictionary<string, List<IPublishing>> _subscribersList =
new Dictionary<string, List<IPublishing>>();
static public Dictionary<string, List<IPublishing>> SubscribersList
{
get {
lock (typeof(Filter))
{
return _subscribersList;
}
}
}
static public List<IPublishing> GetSubscribers(String topicName)
{
lock (typeof(Filter))
{
if (SubscribersList.ContainsKey(topicName))
{
return SubscribersList[topicName];
}
else
return null;
}
}
static public void AddSubscriber(String topicName, IPublishing subscriberCallbackReference)
{
lock (typeof(Filter))
{
if (SubscribersList.ContainsKey(topicName))
{
if (!SubscribersList[topicName].Contains(subscriberCallbackReference))
{
SubscribersList[topicName].Add(subscriberCallbackReference);
}
}
else
{
List<IPublishing> newSubscribersList = new List<IPublishing>();
newSubscribersList.Add(subscriberCallbackReference);
SubscribersList.Add(topicName, newSubscribersList);
}
}
}
static public void RemoveSubscriber(String topicName, IPublishing subscriberCallbackReference)
{
lock (typeof(Filter))
{
if (SubscribersList.ContainsKey(topicName))
{
if (SubscribersList[topicName].Contains(subscriberCallbackReference))
{
SubscribersList[topicName].Remove(subscriberCallbackReference);
}
}
}
}
}
The dictionary _subscribersList
keeps the subscriber topic wise. Here, the key is topic name (String type).Value
is a List
of subscriber’s callback references (List<IPublishing>
).
The GetSubscribers(String topicName)
method returns a list of callback references based on the Topic Name. Callback references hold necessary mechanism to notify subscribers. This method first checks whether the topic name exists in the dictionary. If it exists, then it returns the corresponding list of callback references of that topic. If not, then returns null
.
The method named AddSubscriber
adds the callback Reference of a subscriber to a list topic wise. It has two parameters: First one is topic name and second one is callback reference. This method first checks whether there is a list for that topic. If it exists, then adds this callback reference to that list. If not, then makes a new list for this topic and adds the callback reference to the new list.
The method named RemoveSubscriber(String topicName, IPublishing subscriberCallbackReference)
removes the callback Reference of a subscriber from the corresponding topic wise list. It has two parameters: First one is topic name and second one is callback reference. It first checks whether there is a list of this topic. If not, it does nothing. If it exists, then it checks whether this callback reference exists in the list. If it exists, removes it, otherwise does nothing.
Step 2: Implementation of Subscribe Service
Subscription service implements the ISubscription
interface. This service methods are called by subscriber application to make it subscribed and unsubscribed. During the subscription of a subscriber, Subscribe Service keeps the callback reference of the subscriber so that later time publisher can notify the Subscriber
using the callback reference.
Now the question is how we can get callback Reference of a subscriber?
When a subscriber invoke a method call in the subscription service, in that call using the following line Subscriber
Service can get callback reference:
IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
We get the call back reference using the above call as every subscriber implements the callback Interface.
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class Subscription : ISubscription
{
#region ISubscription Members
public void Subscribe(string topicName)
{
IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
Filter.AddSubscriber(topicName, subscriber);
}
public void UnSubscribe(string topicName)
{
IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
Filter.RemoveSubscriber(topicName, subscriber);
}
#endregion
}
When the method Subscribe(string topicName)
is called from the Subscriber application, it first gets the callback reference by calling method GetCallbackChannel
and then passes the topic name and callback reference to filter class to be registered.
When the method UnSubscribe(string topicName)
is called from the Subscriber
application, it first gets the callback reference by calling method GetCallbackChannel
and then calls the method RemoveSubscriber
of filter class with the parameters: topic name, callback reference to be Unsubscribed
.
Step 3: Implementation of Publisher Service
Publisher Service implements IPublishing
interface. When a publisher invokes the method named Publish(Message e, string topicName)
of Publisher Service, then the publish
method calls the method Filter.GetSubscribers(topicName)
to get list of callback references of subscribers for this topic. Then publish
method gets the method information of "Publish
”. Then for each call back references, fire the method "Publish
" using the following line of code with appropriate parameters.
publishMethodInfo.Invoke(subscriber, new object[] { e, topicName });
This call notifies the subscribers about the event who are interested in the topic.
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class Publishing : IPublishing
{
#region IPublishing Members
public void Publish(Message e, string topicName)
{
List<IPublishing> subscribers = Filter.GetSubscribers(topicName);
Type type = typeof(IPublishing);
MethodInfo publishMethodInfo = type.GetMethod("Publish");
foreach (IPublishing subscriber in subscribers)
{
try
{
publishMethodInfo.Invoke(subscriber, new object[] { e, topicName });
}
catch
{
}
}
}
#endregion
}
Step 4: How callback is implemented in subscriber Application
WCF has the mechanism to call back a client. To implement callback in WCF, WCF service exposes a callback contract that all client applications must implement and requires communication over duplex channel. You also have to use Duplex channelFactory
to construct the communication channel and also need to provide a Reference Context. netTcpBinding
, wsdualHttpBinding
and namedPipeBinding
support callback as they are bidirectional.
- To do so, at first you have to expose a callback contract. The callback contract is in the following:
[ServiceContract]
public interface IPublishing
{
[OperationContract(IsOneWay = true)]
void Publish(Message e, string topicName);
}
- Then, specify the callback contract in service contract. A service contract can have at most one callback contract.
[ServiceContract(CallbackContract = typeof(IPublishing))]
public interface ISubscription
{
[OperationContract]
void Subscribe(string topicName);
[OperationContract]
void UnSubscribe(string topicName);
}
- Then implement the callback contract in
Subscriber
application as Clients of duplex services must implement a callback contract class. In this project, it is done using the following code:
public partial class Subscriber : Form, IPublishing
{
.........(Code)
#region IMyEvents Members
public void Publish(Message e, String topicName)
{
if (e != null)
{
int itemNum = (lstEvents.Items.Count < 1) ? 0 : lstEvents.Items.Count;
lstEvents.Items.Add(itemNum.ToString());
lstEvents.Items[itemNum].SubItems.AddRange(new string[]
{ e.TopicName.ToString(), e.EventData });
_eventCount += 1;
txtAstaEventCount.Text = _eventCount.ToString();
}
}
#endregion
}
- Create an instance of the
Subscriber
(callback contract implementation class) and use it to create the System.ServiceModel.InstanceContext
object that you will pass to DuplexChannelFactory
class as constructor parameter.
public void MakeProxy(string EndpoindAddress, object callbackinstance)
{
NetTcpBinding netTcpbinding = new NetTcpBinding(SecurityMode.None);
EndpointAddress endpointAddress = new EndpointAddress(EndpoindAddress);
InstanceContext context = new InstanceContext(callbackinstance);
DuplexChannelFactory<ISubscription> channelFactory =
new DuplexChannelFactory<ISubscription>(
new InstanceContext(this),netTcpbinding, endpointAddress);
_proxy = channelFactory.CreateChannel();
}
- Then, Invoke operations of Service contract (Subscription Service) and handle callback operations in client code.
Step 5: How publisher is implemented
It constructs the communication channel with publishing service using ChannelFactory
. The following code is used for this purpose:
private void CreateProxy()
{
string endpointAddressInString = ConfigurationManager.AppSettings["EndpointAddress"];
EndpointAddress endpointAddress = new EndpointAddress(endpointAddressInString);
NetTcpBinding netTcpBinding = new NetTcpBinding();
_proxy = ChannelFactory<IPublishing>.CreateChannel(netTcpBinding, endpointAddress);
}
Then using this proxy reference, it can invoke the publish
method of publishing service to send an event using the following code:
_proxy.Publish(alertData, topicName);
Hosting of Publisher and Subscriber Service
The following lines are used to host Publishing Service. In this endpoint, address is net.tcp://localhost:7001/Pub. Binding is netTcpBinding
and contract is IPublishing
. If you would like to add multiple protocols support, then you have to add multiple Endpoints using AddServiceEndpoint
method.
private void HostPublishService()
{
_publishServiceHost = new ServiceHost(typeof(Publishing));
NetTcpBinding tcpBindingpublish = new NetTcpBinding();
_publishServiceHost.AddServiceEndpoint(typeof(IPublishing), tcpBindingpublish,
"net.tcp://localhost:7001/Pub");
_publishServiceHost.Open();
}
The following three lines are used to host Subscription Service. In this endpoint, address is net.tcp://localhost:7002/Sub. Binding is netTcpBinding
and contract is IPublishing
. If you would like to add multiple protocols support, then you have to add multiple Endpoints using AddServiceEndpoint
method.
private void HostSubscriptionService()
{
_subscribeServiceHost = new ServiceHost(typeof(Subscription));
NetTcpBinding tcpBinding = new NetTcpBinding(SecurityMode.None);
_subscribeServiceHost.AddServiceEndpoint(typeof(ISubscription), tcpBinding,
"net.tcp://localhost:7002/Sub");
_subscribeServiceHost.Open();
}
Why is Publish Method One Way?
Publish
method has no returned values and publisher does not need the acknowledgement of this method invocation.
Why is Channel Factory Used to Construct Communication Channel Here?
If we use proxy class for publisher and subscriber, then when we will change data contract, Service contract, callback contract we need to regenerate the proxy for publisher and subscriber. If we use channel factory to construct the channel, we do not need to change channel factory code when there is a change in data contract, service contract and callback contract. Here, to use Channel Factory to construct channel between server and publisher/subscriber, all the contracts are kept in a separate DLL that is shared by server, publisher and subscribe.
Socket Implementation VS WCF Implementation
- Socket based implementation sends comma separated plain text message which is not verbose whereas WCF implementation send SOAP message which is verbose.
- Socket Based implementation is not based on any standard. To achieve interoperability, proprietary protocol is not a good choice whereas SOAP is a standard.
- Socket based implementation does not need WCF runtime so it can be used for embedded programming but WCF implementation does not.
- WCF Implementation is in object oriented fashion but Socket implementation is not.
- WCF implementation can support multiple protocols easily just adding extra endpoint. But in socket implementation if you want to add multiple protocol support, you have to do considerable works like you have to write different implementation for each protocol.
- To give security feature in socket based implementation, you have to do lot of works and it will not be based on standard. But in WCF, you can give message security easily based on standard.
- Socket based implementation is fast and consumes less traffic than WCF implementation.
Sample Code
Here, a project has been attached which shows topic based publish/subscribe design pattern implementation in C# (using WCF Callback).
Conclusion
Thanks for reading this write up. I hope that this article will be helpful for some people. If you guys have any questions, I would love to answer them. I always appreciate comments.
History
- 22/03/09: Initial release
References