Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Topic-based Publish/Subscribe design pattern implementation in C# - Part I (Using socket programming)

4.94/5 (28 votes)
23 Mar 2009CPOL16 min read 135K   3.2K  
Implementation of a topic based Publish Subscribe design pattern using socket programming, and a proprietary messaging protocol.

Introduction

This is Part 1 in a series of articles on writing a topic based Publish/Subscribe design pattern implementation in C#. This series shows a Publish/Subscribe design pattern implementation in two approaches, with the same sample program, along with the general idea of the Publish/Subscribe design pattern implementation. The approaches use socket programming and WCF.

As we know, whatever technology (Socket/Remoting/WCF) we use to implement the Publish/Subscribe design pattern, the end result will almost be the same. For this reason, at first, each part in this series of articles discusses what the general ideas are to implement the Publish/Subscribe design pattern. Then, it shows the implementation using a specific technology. Here, the Publish/Subscribe design pattern is implemented using two different approaches for the same sample application so that you can compare each approach implementation with others and can pick one which best fits your requirements. The advantages and disadvantages of each approach are also discussed in each part.

Can You Read Each Part Independently?

Here, you can read each part independently. To make each part independently readable, the necessary information is repeated in each part.

Background

I wrote an article about a year ago which discusses a non-topic based Publish/Subscribe design pattern implementation in C#. After that, I got a few requests to make a topic based version of this. I gave them word that I will surely do it when I get free time. A few days ago, I got some days off and started to change the implementation for the non-topic based to topic based. But after seeing my code and article after an year, it seems to me the code is unnecessarily lengthy and article is not informative enough. Then, I started to write this two parts series article to make my work better. I have written this series of articles as a learning exercise, and I expect comments from you about the write up. Please let me know if you have any suggestions.

What is topic based Publish Subscribe design pattern?

In a topic based Publish-Subscribe pattern, sender applications tag each message with the name of a topic, instead of referencing specific receivers. The messaging system then sends the message to all applications that have asked to receive messages on that topic. Message senders need only concern themselves with creating the original message, and can leave the task of servicing the receivers to the messaging infrastructure.

In this pattern, the publisher and subscriber can communicate only via messages. The Publish-Subscribe pattern solves the tight coupling problem. It is a very loosely coupled architecture, in which senders do not even know who their subscribers are.

pubsub.png

Topic-based Publish Subscribe paradigm

There are three types of software applications in the above diagram: Message Publisher, Message Subscriber, and Pub/Sub Server. Here, a message publisher sends messages to the message server without any knowledge of message subscribers, and message subscribers will only get the messages of types for which they are registered with the server. E.g., say we have three different topics (message types): a, b, c, but only topic a is of interest to Subscriber 1, topics b and c are of interest to Subscriber 2, and Subscriber 3 wants notifications for topics a and c. So subscriber 1 will be notified for topic a, Subscriber 2 will be notified for topics b and c, and Subscriber 3 will be notified for topics a and c by the server. Here, the message publisher does not know who are receiving the messages, and the message subscriber does not have knowledge about the message publisher.

Basic idea about the implementation

Requirement 1: Subscriber applications subscribe in one or more topics and only receive messages that are of interest.

To implement this, we have to keep the record of which subscriber is interested in which topic, then based on these records, we decide where to relay an event for a particular topic. So we have to maintain the lists of subscribers, topic-wise. For every topic, there will be a list which contains the addresses of the subscribers who have interest in that topic. When a publisher application sends an event of a particular topic to pub/sub server, the pub/sub server needs to relay this event to subscribers who have interest in that topic. To make this happen, the pub/sub server will take the subscriber list of that topic and will send the event to the addresses in this list. To keep the lists of subscriber addresses topic-wise, dictionary is a good choice. The keys in the dictionary represent the topic name and the values represent the subscribers address' list.

Requirement 2: Loose coupling between the publisher and the subscriber. The publisher has no knowledge about the subscribers, including how many there are or where they live, as well as the subscriber has no knowledge about the publisher, including how many there are or where they live.

To implement this, direct communication between publishers and subscribers can not be allowed. So we have to place a separate entity among them which will keep the topic-wise subscribers address list and receive the event from the subscriber, and relay the event to the subscribers, and expose the methods for the subscriber to subscribe topic-wise. The subscribers and publisher only know this separate entity. This separate entity is called the Publish/Subscribe Service or Server. The functionality of this separate entity is divided between the publish service and subscribe service. The publish service will receive the event from the publisher and relay it to the subscribers. The subscriber service will expose the Subscribe/Unsubscribe operations for the subscribers. How do we filter out the subscribers to relay the event of a particular topic? To make filtering out the subscribers to relay an event of a particular topic, an entity named Filter is implemented. The Filter entity is used by both the Publish Service and the Subscriber Service.

implementation2.png

Implementation diagram

Functionality of filter unit:

  • Keeps the list of subscribers topic-wise.
  • Returns a list of subscribers for a topic.
  • Exposes a method to add a new subscriber.
  • Exposes a method to remove a subscriber.

Functionality of publish service:

  • Receives event from the applications (publishers) to notify subscribers.
  • Sends event of a particular topic to applications (subscribers) who have been subscribed for that topic.

Functionality of subscribe service:

  • Exposes method which an application (subscriber) can remotely invoke through a communication channel to subscribe topic-wise for getting events.
  • Exposes method which an application (subscriber) can remotely invoke through a communication channel to unsubscribe topic-wise.

Functionality of subscriber:

  • The subscriber application registers itself for one or more topic events.
  • Receives events when the event is sent from the publisher.

Functionality of publisher:

  • Sends event to the publisher service for publishing.

Message structure to implement the Publish/Subscribe design pattern using sockets

All the communication between the pub/sub server and the publisher/subscriber will be using messages. All messages are comma delimited strings. We will get a message in parts if we split a message by a comma. The message structure is as follows:

MessageStructure.gif

Message structure.

Here, command can be Subscribe/Unsubscribe/Publish. The topic name indicates the category of the event. Event data is the data that is sent by the publisher and is received by the subscriber.

Message for subscription

If a subscriber would like to subscribe for topic A, the command will be Subscribe. The topic name will be A. The remaining message parts will remain blank as there is no event data for subscription. So the message will be like the following:

  • Message Part1: Subscribe
  • Message Part2: A
  • Message Part3-N: Blank
Message for publication

If a publisher would like to send an event for a topic A and the event data has the message "Your salary will be doubled soon”, the command will be Publish, the topic name will be A, and the event data will be "Your salary will be doubled soon”. So the message will be like the following:

  • Message Part1: Publish.
  • Message Part2: A
  • Message Part3-N: Your salary will be doubled soon

Why UDP is used to implement Publish/Subscribe design pattern and not TCP?

UDP is a connectionless protocol whereas TCP is a connection-oriented protocol. Connectionless means that a Pub/Sub Server can send a message to subscribers without first establishing a connection. The Pub/Sub server simply puts a message onto the network with a destination address and hopes that the message arrives. But if we use TCP, then before sending any message, we have to create a connection between the Pub/Sub server and the subscribers. Making a connection before every message is send complicates the entire system. UDP is faster than TCP. The Pub/Sub server does not need an acknowledgement that a packet was received. UDP uses less bandwidth than TCP as the TCP packet size is larger than UDP.

Implementation of Publish/Subscribe using sockets

As we know, UDP allows directly sending and receiving messages without establishing any connection. Here, the UDP protocol is used for all the communication between the pub/sub server and the publisher/subscriber. When a sbscriber application sends a message for subscription to the pub/sub server, the pub/sub server keeps the address (IP, port) of the subscriber topic-wise. When an event message of a particular topic is sent to a pub/sub server by the publisher application, the pub/sub server takes the address list for the corresponding topic and sends the event to every address of the list.

Implementation

Step 1: Making the filter class

The filter class has the following responsibilities and is used by both the Publish Service and the Subscriber Service.

  • Keeps the list of subscriber topic-wise, and
  • Exposes a method to add a new subscriber,
  • Exposes a method to remove a subscriber,
  • Returns a list of subscribers for a topic.
C#
class Filter  
{
    static Dictionary<string, List<EndPoint>> 
      _subscribersList = new Dictionary<string, List<EndPoint>>();

    static public Dictionary<string, List<EndPoint>> SubscribersList
    {
        get { return _subscribersList; }
    }

    static public List<EndPoint> GetSubscribers(String topicName)
    {
        if (SubscribersList.ContainsKey(topicName))
        {
            return SubscribersList[topicName];
        }
        else
            return null;
    }

    static public void AddSubscriber(String topicName, EndPoint subscriberEndPoint)
    {
        if (SubscribersList.ContainsKey(topicName))
        {
            if (!SubscribersList[topicName].Contains(subscriberEndPoint))
            {
                SubscribersList[topicName].Add(subscriberEndPoint);
            }
        }
        else
        {
            List<EndPoint> newSubscribersList = new List<EndPoint>();
            newSubscribersList.Add(subscriberEndPoint);
            SubscribersList.Add(topicName, newSubscribersList);
        }
    }

    static public void RemoveSubscriber(String topicName, EndPoint subscriberEndPoint)
    {
        if (SubscribersList.ContainsKey(topicName))
        {
            if (SubscribersList[topicName].Contains(subscriberEndPoint))
            {
                SubscribersList[topicName].Remove(subscriberEndPoint);
            }
        }
    }
}

The dictionary _subscribersList keeps the subscribers topic-wise. Here, the key is the topic name (String type). The value is a list of subscriber EndPoints (IP, Port).

The GetSubscribers(String topicName) method returns a list of subscriber EndPoints (IP, Port) based on the topic name. Actually, these EndPoints hold the necessary mechanism (IP, Port) to notify the subscribers. This method first checks whether the topic name exists in the dictionary. If so, then it returns the corresponding list of EndPoints according to that topic. If not, it returns null.

The method named AddSubscriber adds the EndPoint of a subscriber to the corresponding list topic-wise. It has two parameters: the topic name and the EndPoint. This method first checks whether there is a list for that topic. If so, it then adds this EndPoint to that list. If not, it makes a new list for this topic and adds the EndPoint to the new list.

The RemoveSubscriber(String topicName, EndPoint subscriberEndPoint) method removes the EndPoint of a subscriber from the corresponding topic-wise list. It has two parameters: topic name and EndPoint. It first checks whether there is a list for this topic. If not, it does nothing. If yes, then it checks whether this exist exists in the list. If it exists, then it removes it, otherwise does nothing.

Step 2: Implementation of subscribe Service

There is a static method named HostSubscriberService in the SubscribeService class. You have to call this method to start the Subscribe service. Here, the Subscribe service and the Publish service are hosted in the same process. As we know, a process has only one thread if you do not start any new threads. For this reason, when the StartSubscriberService() method is called, a separate thread is created to host SubscriberService. Otherwise, the UI will be unresponsive.

C#
public void StartSubscriberService()
{
    Thread th = new Thread(new ThreadStart(HostSubscriberService));
    th.IsBackground = true;
    th.Start();
}

To host the Subscriber service, you will need the IP of the host machine and we can get this using the ReturnIPOfHostMachine() method. But here, the IP address “127.0.0.1” is used so that the applications can be run without any hassle. Then, we will create an EndPoint using the IP and port 10001. We will create a UDP socket and bind the socket with the EndPoint.

C#
private void HostSubscriberService()
{
    IPAddress ipV4 = IPAddress.Parse("127.0.0.1");
    IPEndPoint localEP = new IPEndPoint(ipV4, 10001);
    Socket server = new Socket(AddressFamily.InterNetwork, 
                        SocketType.Dgram, ProtocolType.Udp);
    server.Bind(localEP);
    StartListening(server);
}

We will then call the method StartListening to start receiving messages. As we are using UDP here, we do not need to create a connection before receiving any message.

C#
private static void StartListening(Socket server)
{
    EndPoint remoteEP = new IPEndPoint(IPAddress.Any, 0);
    int recv = 0;
    byte[] data = new byte[1024];
    while (true)
    {

        recv = 0;
        data = new byte[1024];
        recv = server.ReceiveFrom(data, ref remoteEP);
        string messageSendFromClient = Encoding.ASCII.GetString(data, 0, recv);
        string[] messageParts = messageSendFromClient.Split(",".ToCharArray());

        if (!string.IsNullOrEmpty(messageParts[0]))
        {
            switch (messageParts[0])
            {
                case "Subscribe":
                    if (!string.IsNullOrEmpty(messageParts[1]))
                    {
                        Filter.AddSubscriber(messageParts[1], remoteEP);
                    }
                    break;
                case "UnSubscribe":

                    if (!string.IsNullOrEmpty(messageParts[1]))
                    {
                        Filter.RemoveSubscriber(messageParts[1], remoteEP);
                    }
                    break;
            }
        }
    }
}

The StartListening method becomes blocked when it calls the server.ReceiveFrom(data, ref remoteEP) until no data is received. When the server.ReceiveFrom(data, ref remoteEP) method is called, the method keeps the socket waiting to receive data. As it is a subscription service, only requests for subscription and unsubscription will come in this service. As soon as any data is received, it is converted into a string and we split the string to find the command and the topic name. Here, messageParts[0] represents the command, and it is compared with the "Subscribe" and "UnSubscribe" strings with a switch statement. If a match is found for "Subscribe", then the AddSubscriber method of the Filter class is called with the topic name and the EndPoint. If a match is found for "UnSubscribe", then the RemoveSubscriber method of the Filter class is called with the topic name and EndPoint.

Step 3: Implementation of the publisher service

There is a static method named StartPublisherService in the publish service class. You have to call this method to start the publish service. Here, the subscribe service and the publish service are hosted in the same process. As we know, by default, a process has only one thread if you do not start any new threads. For this reason, when the StartSubscriberService() method is called, a separate thread is created to host the publish service.

C#
public void StartPublisherService()
{
    Thread th = new Thread(new ThreadStart(HostPublisherService));
    th.IsBackground = true;
    th.Start();
}

To host a Publisher Service, we will need the IP of the host machine and we can get this using the ReturnIPOfHostMachine() method. But here, the IP address “127.0.0.1” is used so that the applications can be run without any hassle. We will then create an EndPoint using this IP and port 10002. Then we will create a UDP socket and bind the socket with the EndPoint.

C#
private void HostPublisherService()
{
    IPAddress ipV4 = IPAddress.Parse("127.0.0.1"); 
    IPEndPoint localEP = new IPEndPoint(ipV4, 10002);
    Socket server = new Socket(AddressFamily.InterNetwork, 
                               SocketType.Dgram, ProtocolType.Udp);
    server.Bind(localEP);
    StartListening(server);
}

We will call the StartListening method to start receiving messages. As we are using UDP here, there is no need to create a connection before receiving messages.

C#
private static void StartListening(Socket server)
{
    EndPoint remoteEP = new IPEndPoint(IPAddress.Any, 0);
    int recv = 0;
    byte[] data = new byte[1024];
    while (true)
    {
        try
        {
            recv = 0;
            data = new byte[1024];
            recv = server.ReceiveFrom(data, ref remoteEP);
            string messageSendFromClient = Encoding.ASCII.GetString(data, 0, recv);
            string[] messageParts = messageSendFromClient.Split(",".ToCharArray());
            String command = messageParts[0];
            String topicName = messageParts[1];
            if (!string.IsNullOrEmpty(command))
            {
                if (messageParts[0] == "Publish")
                {
                    if (!string.IsNullOrEmpty(topicName))
                    {
                        List<string> eventParts = 
                                new List<string>(messageParts);
                        eventParts.RemoveRange(0, 1);
                        string message = MakeCommaSeparatedString(eventParts);
                        List<EndPoint> subscriberListForThisTopic = 
                                         Filter.GetSubscribers(topicName);
                        WorkerThreadParameters workerThreadParameters = 
                                         new WorkerThreadParameters();
                        workerThreadParameters.Server = server;
                        workerThreadParameters.Message = message;
                        workerThreadParameters.SubscriberListForThisTopic = 
                                         subscriberListForThisTopic;

                        ThreadPool.QueueUserWorkItem(new WaitCallback(Publish), 
                                         workerThreadParameters);
                    }
                }
            }
        }
        catch
        { }
    }
}

The StartListening method becomes blocked when it calls the server.ReceiveFrom(data, ref remoteEP) until no data is received. In the server.ReceiveFrom(data, ref remoteEP) method call, the method keeps the socket waiting to receive data. As it is a publication service, only requests for event publishing will come in this service. As soon as any data is received, it is converted into a string and we split the string to find the command and the topic name. messageParts[0] represents the command and it is compared with the "Publish" string. If a match is found, then we will do the following things to publish the message:

  • First prepare the message for sending to the subscriber. To do so, we will remove the command portion of the message that is received from the publisher.
  • Split the message (sent by the publisher) into message parts. At this point, remove the first message part that is the command. Then, make a new comma delimited string with the rest of the message, using the MakeCommaSeparatedString(eventParts) method.
  • Then, get the subscriber address list for this topic by calling the GetSubscribers (TopicName) method of the Filter class.
  • After getting the address list of the topic, we send the message asynchronously, because if we do synchronously, it will block the thread until the message is sent to all subscribers. To perform this action asynchronously, we use WaitCallback and thread pool.

Now we will see how the publish method works. This method sends the message to the list of subscribers using the socket of the publishing service.

C#
public static void Publish(object stateInfo)
{
    WorkerThreadParameters workerThreadParameters = (WorkerThreadParameters)stateInfo;
    Socket server = workerThreadParameters.Server;
    string message = workerThreadParameters.Message;
    List<EndPoint> subscriberListForThisTopic = 
       workerThreadParameters.SubscriberListForThisTopic;
    int messagelength = message.Length;

    if (subscriberListForThisTopic != null)
    {
        foreach (EndPoint endPoint in subscriberListForThisTopic)
        {
            server.SendTo(Encoding.ASCII.GetBytes(message), 
                 messagelength, SocketFlags.None, endPoint);

        }
    }
}
Step 4: Implementation of subscribers

In the constructor, we will get the subscriber service address (IP, Port) from the configuration file and make an EndPoint using this info. Then, we make a UDP socket to subscribe and receive events, using the following line:

C#
_client = new Socket(AddressFamily.InterNetwork, 
                     SocketType.Dgram, ProtocolType.Udp);
_remoteEndPoint = new IPEndPoint(serverIPAddress, serverPort);

When the Subscribe button is called, we have to make a message for the subscription. For the subscription message, the command is "Subscribe”. We then take the topic name from the textbox named txtTopicName, make the message, and send it using the socket to the subscription service. Here, the remote endpoint is the subscription service's IP and port. In addition to this, we have to start the event receiving thread if it has not started yet. For message receiving, we keep a separate thread so that it does not make the UI unresponsive.

C#
private void button3_Click(object sender, EventArgs e)
{
    ((Button)sender).Visible = false;
    button2.Visible = true;

    string Command = "Subscribe";
    String TopicName = txtTopicName.Text;
    string message = Command + "," + TopicName;
    _client.SendTo(Encoding.ASCII.GetBytes(message), _remoteEndPoint);

    if (_isReceivingStarted == false)
    {
        _isReceivingStarted = true;
        _data = new byte[1024];
        Thread thread1 = new Thread(new ThreadStart(ReceiveDataFromServer));
        thread1.IsBackground = true;
        thread1.Start();
    }
}

When the Unsubscribe button is called, we have to make a message for unsubscription. For the unsubscription message, tyhe command is "Unsubscribe”. We then take the topic name from the textbox named txtTopicName, make the message, and send it using the socket to the subscription service. Here, the remote endpoint is the subscription service’s IP and port.

Step 5: Implementation of the publisher

In the constructor, we get the publisher service address (IP, Port) from the configuration file and make an EndPoint using this info. Then, we make a UDP socket to publish an event.

C#
private void SendASingleEvent()
{
    String topicName = textBox1.Text;
    string eventData = textBox2.Text;
    string message = _command + "," + topicName + "," + eventData;
    _client.SendTo(Encoding.ASCII.GetBytes(message), _remoteEndPoint);
    _noOfEventsFired++;
    txtEventCount.Text = _noOfEventsFired.ToString();
}

When the Publish button is clicked, we have to make a message for event publishing. To publish the message, the command is "Publish”. Take the topic name from the textbox named txtTopicName. Also, take the event data from the eventdata textbox. Make the message and send it using the socket to the publication service. Here, the remote endpoint is the publication service’s IP and port.

Socket implementation vs. WCF implementation

  • Socket based implementation sends a comma separated plain text message which is not verbose, whereas the WCF implementation sends a SOAP message which is verbose.
  • Socket based implementation is not based on any standards. To achieve interoperability, a proprietary protocol is not a good choice whereas SOAP is a standard.
  • Socket based implementation does not need the WCF runtime so it can be used for embedded programming, but WCF implementation can not.
  • A WCF implementation is in an object oriented fashion, but a socket implementation is not.
  • A WCF implementation can support multiple protocols easily just by adding extra endpoints. But in a socket implementation, if you want to add multiple protocol support, you have to do considerable work, like you have to write different implementations for each protocol.
  • To give security feature in a socket based implementation, you have to do lots of work, and it will not be based on the standard. But in WCF, you can give message security easily based on a standard.
  • Socket based implementations are fast and consume less traffic than WCF implementations.

Sample code

A project has been attached which shows a topic based Publish/Subscribe design pattern implementation in C# (using socket programming).

Conclusion

Thanks for reading this write up. I hope this article will be helpful for some people. If you guys have any questions, I would love to answer. I always appreciate comments.

History

  • Initial release – 22/03/09.

References

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)