Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Multiple Publishers - Multiple Subscribers Communication

4.67/5 (3 votes)
24 Oct 2010CPOL2 min read 16K   65  
Simple example showing how to implement the communication scenario where subscribing applications can receive notification messages from more publishing applications.

Summary

This is a simple example showing how to implement the communication scenario where subscribing applications can receive notification messages from more publishing applications.

The example uses Eneter Messaging Framework.
(The framework is free and can be downloaded from www.eneter.net. The online help for developers can be found at http://www.eneter.net/OnlineHelp/EneterMessagingFramework/Index.html.)

Introduction

The article is a free continuation of Public-Subscribe Communication where one application (publisher) sends notifications to more applications (subscribers).
Now I would like to describe the scenario where subscribing applications need to receive notifications from more publishing applications.
The main question in this scenario is how to connect publishing applications with subscribing applications. If subscribers are connected directly to individual publishers, then we can quickly get many connections that can be difficult to manage.

Image 1

To reduce the complexity and number of connections, we can implement one application as Broker that will maintain registered subscribers and forward them messages from publishers. Subscribers are so connected only to the broker and do not have to handle connections with publishers. Publishers send messages only to the broker and do not have to register/unregister subscribers.

Image 2

The following example shows how to implement this scenario with using Eneter Messaging Framework.

Broker

The Broker is responsible for receiving notification messages from publishing applications and for forwarding them to all subscribed receivers.

The implementation of broker application is really trivial with Eneter Messaging Framework.
The whole implementation is here:

C#
using System;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;

namespace BrokerApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            // Create the broker.
            IDuplexBrokerFactory aBrokerFactory = 
                new DuplexBrokerFactory(new XmlStringSerializer());
            IDuplexBroker aBroker = aBrokerFactory.CreateBroker();

            // Create the Input channel receiving messages via Tcp.
            // Note: You can also choose NamedPipes or Http.
            //       (if you choose http, do not forget 
            //        to execute it with sufficient user rights)
            IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory();
            IDuplexInputChannel aBrokerInputChannel = 
               aMessaging.CreateDuplexInputChannel("127.0.0.1:7091");

            // Attach the input channel to the broker and start listening.
            Console.WriteLine("The broker application is running.");
            aBroker.AttachDuplexInputChannel(aBrokerInputChannel);
        }
    }
}

Publisher

The Publisher is responsible for sending messages to the broker application. The broker application then forwards messages to subscribed receivers.
The whole implementation is here:

C#
using System;
using System.Windows.Forms;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;

namespace Publisher
{
    public partial class Form1 : Form
    {
        // Notification message 1
        public class NotifyMsg1
        {
            public DateTime CurrentTime { get; set; }
        }

        // Notification message 2
        public class NotifyMsg2
        {
            public int Number { get; set; }
        }

        // Notification message 3
        public class NotifyMsg3
        {
            public string TextMessage { get; set; }
        }

        public Form1()
        {
            InitializeComponent();

            // Create broker client responsible for sending messages to the broker.
            IDuplexBrokerFactory aBrokerFactory = 
                      new DuplexBrokerFactory(new XmlStringSerializer());
            myBrokerClient = aBrokerFactory.CreateBrokerClient();

            // Create output channel to send messages via Tcp.
            IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory();
            myOutputChannel = aMessaging.CreateDuplexOutputChannel("127.0.0.1:7091");

            // Attach the output channel to the broker client 
            // to be able to send messages.
            myBrokerClient.AttachDuplexOutputChannel(myOutputChannel);
        }

        // Correctly close the output channel.
        private void Form1_FormClosed(object sender, FormClosedEventArgs e)
        {
            // Note: The duplex output channel can receive response messages too.
            //       Therefore we must close it to stop the thread 
            //       receiving response messages.
            //       If the thread is not closed then the application 
            //       could not be correctly closed.
            myBrokerClient.DetachDuplexOutputChannel();
            myOutputChannel.CloseConnection();
        }

        // Send NotifyMsg1
        private void Notify1Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg1 aMsg = new NotifyMsg1();
            aMsg.CurrentTime = DateTime.Now;

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg1>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg1", aSerializedMsg);
        }

        // Send NotifyMsg2
        private void Notify2Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg2 aMsg = new NotifyMsg2();
            aMsg.Number = 12345;

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg2>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg2", aSerializedMsg);
        }

        // Send NotifyMsg3
        private void Notify3Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg3 aMsg = new NotifyMsg3();
            aMsg.TextMessage = "My notifying text message.";

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg3>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg3", aSerializedMsg);
        }

        // Broker client is used to send messages to the broker,
        // that forwards messages to subscribers.
        private IDuplexBrokerClient myBrokerClient;

        // The output channel used by the broker client to send messages to the broker.
        private IDuplexOutputChannel myOutputChannel;

        // Serializer used to serialize notification messages.
        // Note: It is possible to use BinarySerializer too.
        //       In that case the messages would have to be declared in a
        //       separate library that would be linked by subscribers too.
        private XmlStringSerializer mySerializer = new XmlStringSerializer();
    }
}

Subscriber

The Subscriber is responsible for its registering in the broker application to receive desired notification messages.

The whole implementation is here:

C#
using System;
using System.Windows.Forms;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;

namespace Subscriber
{
    public partial class Form1 : Form
    {
        // Notification message 1
        public class NotifyMsg1
        {
            public DateTime CurrentTime { get; set; }
        }

        // Notification message 2
        public class NotifyMsg2
        {
            public int Number { get; set; }
        }

        // Notification message 3
        public class NotifyMsg3
        {
            public string TextMessage { get; set; }
        }

        public Form1()
        {
            InitializeComponent();

            // Create the broker client that will receive notification messages.
            IDuplexBrokerFactory aBrokerFactory = 
                  new DuplexBrokerFactory(new XmlStringSerializer());
            myBrokerClient = aBrokerFactory.CreateBrokerClient();
            myBrokerClient.BrokerMessageReceived += OnNotificationMessageReceived;

            // Create the Tcp messaging for the communication with the publisher.
            // Note: For the interprocess communication you can use: 
            // Tcp, NamedPipes and Http.
            IMessagingSystemFactory aMessagingFactory = new TcpMessagingSystemFactory();

            // Create duplex output channel for the communication with the publisher.
            // Note: The duplex output channel can send requests and receive responses.
            //       In our case, the broker client will send 
            //       requests to subscribe/unsubscribe
            //       and receive notifications as response messages.
            myOutputChannel = aMessagingFactory.CreateDuplexOutputChannel
                        ("127.0.0.1:7091");

            // Attach the output channel to the broker client
            myBrokerClient.AttachDuplexOutputChannel(myOutputChannel);
        }

        // Correctly close the communication.
        // Note: If the communication is not correctly closed, the thread listening to
        //       response messages will not be closed.
        private void Form1_FormClosed(object sender, FormClosedEventArgs e)
        {
            myBrokerClient.DetachDuplexOutputChannel();
            myOutputChannel.CloseConnection();
        }

        // Method processing notification messages from the publisher.
        private void OnNotificationMessageReceived
        (object sender, BrokerMessageReceivedEventArgs e)
        {
            // The notification event does not come in UI thread.
            // Therefore, if we want to work with UI controls 
            // we must execute it in the UI thread.
            InvokeInUIThread(() =>
                {
                    if (e.ReceivingError == null)
                    {
                        if (e.MessageTypeId == "MyNotifyMsg1")
                        {
                            NotifyMsg1 aDeserializedMsg = 
                                mySerializer.Deserialize<NotifyMsg1>(e.Message);
                            Received1TextBox.Text = 
                                aDeserializedMsg.CurrentTime.ToString();
                        }
                        else if (e.MessageTypeId == "MyNotifyMsg2")
                        {
                            NotifyMsg2 aDeserializedMsg = 
                            mySerializer.Deserialize<NotifyMsg2>(e.Message);
                            Received2TextBox.Text = aDeserializedMsg.Number.ToString();
                        }
                        else if (e.MessageTypeId == "MyNotifyMsg3")
                        {
                            NotifyMsg3 aDeserializedMsg = 
                            mySerializer.Deserialize<NotifyMsg3>(e.Message);
                            Received3TextBox.Text = aDeserializedMsg.TextMessage;
                        }
                    }
                });
        }

        // Subscribe to notification message 1
        private void Subscribe1Btn_Click(object sender, EventArgs e)
        {
            myBrokerClient.Subscribe("MyNotifyMsg1");
        }

        // Unsubscribe from notification message 1
        private void Unsubscribe1Btn_Click(object sender, EventArgs e)
        {
            Received1TextBox.Text = "";
            myBrokerClient.Unsubscribe("MyNotifyMsg1");
        }

        // Subscribe to notification message 2
        private void Subscribe2Btn_Click(object sender, EventArgs e)
        {
            myBrokerClient.Subscribe("MyNotifyMsg2");
        }

        // Unsubscribe from notification message 2
        private void Unsubscribe2Btn_Click(object sender, EventArgs e)
        {
            Received2TextBox.Text = "";
            myBrokerClient.Unsubscribe("MyNotifyMsg2");
        }

        // Subscribe to notification message 3
        private void Subscribe3Btn_Click(object sender, EventArgs e)
        {
            myBrokerClient.Subscribe("MyNotifyMsg3");
        }

        // Unsubscribe from notification message 3
        private void Unsubscribe3Btn_Click(object sender, EventArgs e)
        {
            Received3TextBox.Text = "";
            myBrokerClient.Unsubscribe("MyNotifyMsg3");
        }

        // Helper method to invoke some functionality in UI thread.
        private void InvokeInUIThread(Action uiMethod)
        {
            // If we are not in the UI thread then we must synchronize 
            // via the invoke mechanism.
            if (InvokeRequired)
            {
                Invoke(uiMethod);
            }
            else
            {
                uiMethod();
            }
        }

        // BrokerClient provides the communication with the broker.
        private IDuplexBrokerClient myBrokerClient;

        // The output channel used by the broker client to send messages to the broker.
        private IDuplexOutputChannel myOutputChannel;

        // Serializer used to serialize notification messages.
        // Note: It is possible to use BinarySerializer too.
        //       In that case the messages would have to be declared in a
        //       separate library that would be linked by publisher too.
        private XmlStringSerializer mySerializer = new XmlStringSerializer();
    }
}

And for the completeness, here are publishers and subscribers running together with the broker.

Image 3


I hope you found the article useful. If you have any comments or questions, please let me know.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)