Summary
This is a simple example showing how to implement the communication scenario where subscribing applications can receive notification messages from more publishing applications.
The example uses Eneter Messaging Framework.
(The framework is free and can be downloaded from www.eneter.net. The online help for developers can be found at http://www.eneter.net/OnlineHelp/EneterMessagingFramework/Index.html.)
Introduction
The article is a free continuation of Public-Subscribe Communication where one application (publisher) sends notifications to more applications (subscribers).
Now I would like to describe the scenario where subscribing applications need to receive notifications from more publishing applications.
The main question in this scenario is how to connect publishing applications with subscribing applications. If subscribers are connected directly to individual publishers, then we can quickly get many connections that can be difficult to manage.
To reduce the complexity and number of connections, we can implement one application as Broker
that will maintain registered subscribers and forward them messages from publishers. Subscribers are so connected only to the broker and do not have to handle connections with publishers. Publishers send messages only to the broker and do not have to register/unregister subscribers.
The following example shows how to implement this scenario with using Eneter Messaging Framework
.
Broker
The Broker
is responsible for receiving notification messages from publishing applications and for forwarding them to all subscribed receivers.
The implementation of broker application is really trivial with Eneter Messaging Framework
.
The whole implementation is here:
using System;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;
namespace BrokerApplication
{
class Program
{
static void Main(string[] args)
{
IDuplexBrokerFactory aBrokerFactory =
new DuplexBrokerFactory(new XmlStringSerializer());
IDuplexBroker aBroker = aBrokerFactory.CreateBroker();
IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory();
IDuplexInputChannel aBrokerInputChannel =
aMessaging.CreateDuplexInputChannel("127.0.0.1:7091");
Console.WriteLine("The broker application is running.");
aBroker.AttachDuplexInputChannel(aBrokerInputChannel);
}
}
}
Publisher
The Publisher
is responsible for sending messages to the broker application. The broker application then forwards messages to subscribed receivers.
The whole implementation is here:
using System;
using System.Windows.Forms;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;
namespace Publisher
{
public partial class Form1 : Form
{
public class NotifyMsg1
{
public DateTime CurrentTime { get; set; }
}
public class NotifyMsg2
{
public int Number { get; set; }
}
public class NotifyMsg3
{
public string TextMessage { get; set; }
}
public Form1()
{
InitializeComponent();
IDuplexBrokerFactory aBrokerFactory =
new DuplexBrokerFactory(new XmlStringSerializer());
myBrokerClient = aBrokerFactory.CreateBrokerClient();
IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory();
myOutputChannel = aMessaging.CreateDuplexOutputChannel("127.0.0.1:7091");
myBrokerClient.AttachDuplexOutputChannel(myOutputChannel);
}
private void Form1_FormClosed(object sender, FormClosedEventArgs e)
{
myBrokerClient.DetachDuplexOutputChannel();
myOutputChannel.CloseConnection();
}
private void Notify1Btn_Click(object sender, EventArgs e)
{
NotifyMsg1 aMsg = new NotifyMsg1();
aMsg.CurrentTime = DateTime.Now;
object aSerializedMsg = mySerializer.Serialize<NotifyMsg1>(aMsg);
myBrokerClient.SendMessage("MyNotifyMsg1", aSerializedMsg);
}
private void Notify2Btn_Click(object sender, EventArgs e)
{
NotifyMsg2 aMsg = new NotifyMsg2();
aMsg.Number = 12345;
object aSerializedMsg = mySerializer.Serialize<NotifyMsg2>(aMsg);
myBrokerClient.SendMessage("MyNotifyMsg2", aSerializedMsg);
}
private void Notify3Btn_Click(object sender, EventArgs e)
{
NotifyMsg3 aMsg = new NotifyMsg3();
aMsg.TextMessage = "My notifying text message.";
object aSerializedMsg = mySerializer.Serialize<NotifyMsg3>(aMsg);
myBrokerClient.SendMessage("MyNotifyMsg3", aSerializedMsg);
}
private IDuplexBrokerClient myBrokerClient;
private IDuplexOutputChannel myOutputChannel;
private XmlStringSerializer mySerializer = new XmlStringSerializer();
}
}
Subscriber
The Subscriber
is responsible for its registering in the broker application to receive desired notification messages.
The whole implementation is here:
using System;
using System.Windows.Forms;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;
namespace Subscriber
{
public partial class Form1 : Form
{
public class NotifyMsg1
{
public DateTime CurrentTime { get; set; }
}
public class NotifyMsg2
{
public int Number { get; set; }
}
public class NotifyMsg3
{
public string TextMessage { get; set; }
}
public Form1()
{
InitializeComponent();
IDuplexBrokerFactory aBrokerFactory =
new DuplexBrokerFactory(new XmlStringSerializer());
myBrokerClient = aBrokerFactory.CreateBrokerClient();
myBrokerClient.BrokerMessageReceived += OnNotificationMessageReceived;
IMessagingSystemFactory aMessagingFactory = new TcpMessagingSystemFactory();
myOutputChannel = aMessagingFactory.CreateDuplexOutputChannel
("127.0.0.1:7091");
myBrokerClient.AttachDuplexOutputChannel(myOutputChannel);
}
private void Form1_FormClosed(object sender, FormClosedEventArgs e)
{
myBrokerClient.DetachDuplexOutputChannel();
myOutputChannel.CloseConnection();
}
private void OnNotificationMessageReceived
(object sender, BrokerMessageReceivedEventArgs e)
{
InvokeInUIThread(() =>
{
if (e.ReceivingError == null)
{
if (e.MessageTypeId == "MyNotifyMsg1")
{
NotifyMsg1 aDeserializedMsg =
mySerializer.Deserialize<NotifyMsg1>(e.Message);
Received1TextBox.Text =
aDeserializedMsg.CurrentTime.ToString();
}
else if (e.MessageTypeId == "MyNotifyMsg2")
{
NotifyMsg2 aDeserializedMsg =
mySerializer.Deserialize<NotifyMsg2>(e.Message);
Received2TextBox.Text = aDeserializedMsg.Number.ToString();
}
else if (e.MessageTypeId == "MyNotifyMsg3")
{
NotifyMsg3 aDeserializedMsg =
mySerializer.Deserialize<NotifyMsg3>(e.Message);
Received3TextBox.Text = aDeserializedMsg.TextMessage;
}
}
});
}
private void Subscribe1Btn_Click(object sender, EventArgs e)
{
myBrokerClient.Subscribe("MyNotifyMsg1");
}
private void Unsubscribe1Btn_Click(object sender, EventArgs e)
{
Received1TextBox.Text = "";
myBrokerClient.Unsubscribe("MyNotifyMsg1");
}
private void Subscribe2Btn_Click(object sender, EventArgs e)
{
myBrokerClient.Subscribe("MyNotifyMsg2");
}
private void Unsubscribe2Btn_Click(object sender, EventArgs e)
{
Received2TextBox.Text = "";
myBrokerClient.Unsubscribe("MyNotifyMsg2");
}
private void Subscribe3Btn_Click(object sender, EventArgs e)
{
myBrokerClient.Subscribe("MyNotifyMsg3");
}
private void Unsubscribe3Btn_Click(object sender, EventArgs e)
{
Received3TextBox.Text = "";
myBrokerClient.Unsubscribe("MyNotifyMsg3");
}
private void InvokeInUIThread(Action uiMethod)
{
if (InvokeRequired)
{
Invoke(uiMethod);
}
else
{
uiMethod();
}
}
private IDuplexBrokerClient myBrokerClient;
private IDuplexOutputChannel myOutputChannel;
private XmlStringSerializer mySerializer = new XmlStringSerializer();
}
}
And for the completeness, here are publishers and subscribers running together with the broker.
I hope you found the article useful. If you have any comments or questions, please let me know.
CodeProject