Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Publish-Subscribe Communication

5.00/5 (4 votes)
19 Oct 2010CPOL2 min read 13.5K  
Simple example showing how to implement a communication scenario where one application sends notifications to other applications.

Summary

This is a simple example showing how to implement a communication scenario where one application sends notifications to other applications.

Introduction

The scenario where one application needs to inform other applications about some particular event is very common.
The application that wants to be informed about events subscribes to get notification messages from the publishing application. Then, when the event occurs, the publishing application notifies all subscribed applications.

The example below shows how to implement this scenario using Eneter Messaging Framework.
(The framework is free and can be downloaded from www.eneter.net. The online help for developers can be found at http://www.eneter.net/OnlineHelp/EneterMessagingFramework/Index.html.)

Publisher

The publishing application in our example is very simple. It provides just three events that are invoked when the button is clicked.

To publish these events, the publisher uses the Broker component. The broker component receives messages via the input channel and forwards them to all subscribed receivers.
In our scenario we want, the broker receives the message when the button is clicked and then forwards it to subscribed receivers.
Therefore we need that our broker can communicate two ways:

  • Internally - to receive internal messages (from the same process) when the 'Notify' button is clicked.
  • Interprocess - to register/unregister subscribers and to forward notifications to subscribers.
To enable the communication via 2 different channels, we can use the Dispatcher component.
The dispatcher receives messages and forwards them to all attached receivers. In our case, the dispatcher receives messages from two channels (internal messages and interprocess messages) and forwards them to only one receiver - Broker.

The whole implementation is very simple:

C#
using System;
using System.Windows.Forms;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.Infrastructure.ConnectionProvider;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.SynchronousMessagingSystem;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;
using Eneter.Messaging.Nodes.Dispatcher;

namespace Publisher
{
    public partial class Form1 : Form
    {
        // Notification message 1
        public class NotifyMsg1
        {
            public DateTime CurrentTime { get; set; }
        }

        // Notification message 2
        public class NotifyMsg2
        {
            public int Number { get; set; }
        }

        // Notification message 3
        public class NotifyMsg3
        {
            public string TextMessage { get; set; }
        }

        public Form1()
        {
            InitializeComponent();

            // Create internal messaging system that will be used 
            // for the internal communication.
            IMessagingSystemFactory anInternalMessaging = 
                new SynchronousMessagingSystemFactory();

            //*******************************************************************************
            // Create communication components that will model our communication scenario
            // -> DuplexBroker: to forward notification messages to subscribers
            // -> DuplexDispatcher: to connect the broker with messaging systems: Tcp, Internal
            // -> BrokerClient: to send notifications from this application to the broker
            //*******************************************************************************

            // Create broker responsible for forwarding notification messages.
            IDuplexBrokerFactory aBrokerFactory = 
            new DuplexBrokerFactory(new XmlStringSerializer());
            myBroker = aBrokerFactory.CreateBroker();
            
            // Create broker client used to send notification 
            // messages from this application.
            myBrokerClient = aBrokerFactory.CreateBrokerClient();

            // Create Dispatcher that will receive messages from 
            // Tcp messaging and also from the internal
            // messaging and send them to the broker.
            // Note: Internal messages will come from myBrokerClient
            //       Tcp messages will come from other applications 
            //       subscribed for notifications.
            IDuplexDispatcherFactory aDispatcherFactory = 
            new DuplexDispatcherFactory(anInternalMessaging);
            myDispatcher = aDispatcherFactory.CreateDuplexDispatcher();

            //*******************************************************************************
            // "Click" communication components together
            // -> connect the broker with the dispatcher
            // -> connect the dispatcher with the 'broker client'
            // -> connect the dispatcher to Tcp messaging
            //*******************************************************************************

            // Create helper to create connections for 'internal messaging'.
            // Note: It internally creates input/output channels and 
            // connect them to components.
            IConnectionProviderFactory aConnectionProviderFactory = 
                    new ConnectionProviderFactory();
            IConnectionProvider aConnectionProvider = 
        aConnectionProviderFactory.CreateConnectionProvider(anInternalMessaging);

            // Connect the broker to the internal messaging via the duplex input channel.
            aConnectionProvider.Attach(myBroker, "MyBrokerChannelId");

            // Tell dispatcher to send messages to the broker.
            myDispatcher.AddDuplexOutputChannel("MyBrokerChannelId");

            // Connect dispatcher with the 'broker client'.
            aConnectionProvider.Connect
        (myDispatcher, myBrokerClient, "MyInternalDispatcherChannelId");

            // Create Tcp messaging for the communication with subscribed applications.
            IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory();

            // Attach Tcp duplex input channel to the dispatcher and start listening.
            IDuplexInputChannel aTcpInputChannel = 
        aMessaging.CreateDuplexInputChannel("127.0.0.1:8091");
            myDispatcher.AttachDuplexInputChannel(aTcpInputChannel);
        }

        // Correctly close listening to Tcp messages.
        private void Form1_FormClosed(object sender, FormClosedEventArgs e)
        {
            myDispatcher.DetachDuplexInputChannel();
        }

        // Send NotifyMsg1
        private void Notify1Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg1 aMsg = new NotifyMsg1();
            aMsg.CurrentTime = DateTime.Now;

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg1>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg1", aSerializedMsg);
        }

        // Send NotifyMsg2
        private void Notify2Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg2 aMsg = new NotifyMsg2();
            aMsg.Number = 12345;

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg2>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg2", aSerializedMsg);
        }

        // Send NotifyMsg3
        private void Notify3Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg3 aMsg = new NotifyMsg3();
            aMsg.TextMessage = "My notifying text message.";

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg3>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg3", aSerializedMsg);
        }

        // Broker used to forward messages to subscribers.
        private IDuplexBroker myBroker;

        private IDuplexDispatcher myDispatcher;

        // Broker client is used to send messages to the broker,
        // that forwards messages to subscribers.
        private IDuplexBrokerClient myBrokerClient;

        // Serializer used to serialize notification messages.
        // Note: It is possible to use BinarySerializer too.
        //       In that case the messages would have to be declared in a
        //       separate library that would be linked by subscribers too.
        private XmlStringSerializer mySerializer = new XmlStringSerializer();
    }
}

Subscriber

The subscribing application in this example is very simple too. It can subscribe and unsubscribe via the button and simply displays the incoming notification messages.

To subscribe, unsubscribe and receive messages, the subscriber uses the BrokerClient component.

The whole implementation is very simple:

C#
using System;
using System.Windows.Forms;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;

namespace Subscriber
{
    public partial class Form1 : Form
    {
        // Notification message 1
        public class NotifyMsg1
        {
            public DateTime CurrentTime { get; set; }
        }

        // Notification message 2
        public class NotifyMsg2
        {
            public int Number { get; set; }
        }

        // Notification message 3
        public class NotifyMsg3
        {
            public string TextMessage { get; set; }
        }

        public Form1()
        {
            InitializeComponent();

            // Create the broker client that will receive notification messages.
            IDuplexBrokerFactory aBrokerFactory = 
        new DuplexBrokerFactory(new XmlStringSerializer());
            myBrokerClient = aBrokerFactory.CreateBrokerClient();
            myBrokerClient.BrokerMessageReceived += OnNotificationMessageReceived;

            // Create the Tcp messaging for the communication with the publisher.
            // Note: For the interprocess communication you can use: 
            // Tcp, NamedPipes and Http.
            IMessagingSystemFactory aMessagingFactory = new TcpMessagingSystemFactory();

            // Create duplex output channel for the communication with the publisher.
            // Note: The duplex output channel can send requests and receive responses.
            //       In our case, the broker client will send 
       //       requests to subscribe/unsubscribe
            //       and receive notifications as response messages.
            IDuplexOutputChannel anOutputChannel = 
        aMessagingFactory.CreateDuplexOutputChannel("127.0.0.1:8091");

            // Attach the output channel to the broker client
            myBrokerClient.AttachDuplexOutputChannel(anOutputChannel);
        }

        // Correctly close the communication.
        // Note: If the communication is not correctly closed, the thread listening to
        //       response messages will not be closed.
        private void Form1_FormClosed(object sender, FormClosedEventArgs e)
        {
            myBrokerClient.DetachDuplexOutputChannel();
        }

        // Method processing notification messages from the publisher.
        private void OnNotificationMessageReceived
        (object sender, BrokerMessageReceivedEventArgs e)
        {
            // The notification event does not come in UI thread.
            // Therefore, if we want to work with UI controls 
            // we must execute it in the UI thread.
            InvokeInUIThread(() =>
                {
                    if (e.ReceivingError == null)
                    {
                        if (e.MessageTypeId == "MyNotifyMsg1")
                        {
                            NotifyMsg1 aDeserializedMsg = 
                mySerializer.Deserialize<NotifyMsg1>(e.Message);
                            Received1TextBox.Text = aDeserializedMsg.CurrentTime.ToString();
                        }
                        else if (e.MessageTypeId == "MyNotifyMsg2")
                        {
                            NotifyMsg2 aDeserializedMsg = 
                mySerializer.Deserialize<NotifyMsg2>(e.Message);
                            Received2TextBox.Text = aDeserializedMsg.Number.ToString();
                        }
                        else if (e.MessageTypeId == "MyNotifyMsg3")
                        {
                            NotifyMsg3 aDeserializedMsg = 
                mySerializer.Deserialize<NotifyMsg3>(e.Message);
                            Received3TextBox.Text = aDeserializedMsg.TextMessage;
                        }
                    }
                });
        }

        // Subscribe to notification message 1
        private void Subscribe1Btn_Click(object sender, EventArgs e)
        {
            myBrokerClient.Subscribe("MyNotifyMsg1");
        }

        // Unsubscribe from notification message 1
        private void Unsubscribe1Btn_Click(object sender, EventArgs e)
        {
            Received1TextBox.Text = "";
            myBrokerClient.Unsubscribe("MyNotifyMsg1");
        }

        // Subscribe to notification message 2
        private void Subscribe2Btn_Click(object sender, EventArgs e)
        {
            myBrokerClient.Subscribe("MyNotifyMsg2");
        }

        // Unsubscribe from notification message 2
        private void Unsubscribe2Btn_Click(object sender, EventArgs e)
        {
            Received2TextBox.Text = "";
            myBrokerClient.Unsubscribe("MyNotifyMsg2");
        }

        // Subscribe to notification message 3
        private void Subscribe3Btn_Click(object sender, EventArgs e)
        {
            myBrokerClient.Subscribe("MyNotifyMsg3");
        }

        // Unsubscribe from notification message 3
        private void Unsubscribe3Btn_Click(object sender, EventArgs e)
        {
            Received3TextBox.Text = "";
            myBrokerClient.Unsubscribe("MyNotifyMsg3");
        }

        // Helper method to invoke some functionality in UI thread.
        private void InvokeInUIThread(Action uiMethod)
        {
            // If we are not in the UI thread then we must synchronize 
            // via the invoke mechanism.
            if (InvokeRequired)
            {
                Invoke(uiMethod);
            }
            else
            {
                uiMethod();
            }
        }

        // BrokerClient provides the communication with the broker.
        private IDuplexBrokerClient myBrokerClient;


        // Serializer used to deserialize notification messages.
        // Note: It is possible to use BinarySerializer too.
        //       In that case the messages would have to be declared in a
        //       separate library that would be linked by publisher too.
        private XmlStringSerializer mySerializer = new XmlStringSerializer();
    }
}

You can execute more subscribers and subscribe/unsubscribe them for desired events.

I hope you found the article helpful and if you have any questions about Eneter Messaging Framework, feel free to ask.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)