Introduction
This article shows how to create a very simple client-server solution using Microsoft Message Queuing – seems like there is a need for something very simple, just the basic stuff to get you up and running with a moderately performing solution.
This solution can easily handle 500 000 moderately complex messages in three minutes.
Microsoft Message Queuing – Log Trade information using Microsoft SQL Server takes this solution a few steps further - by illustrating an approach for saving incoming data to Microsoft SQL Server.
Screenshot of the Microsoft Message Queuing client
Background
Over in the Q&A section, a question was posted requesting help with a solution that needed to be capable of handling 1500 messages each minute. 1500 messages shouldn’t be a problem – but a simple answer on how to achieve this is still somewhat more complex than what I’d like to post as a reply to a question. I searched a bit around CodeProject, and to my surprise I didn’t find an existing article illustrating the approach I had in mind.
Screenshot of the Microsoft Message Queuing server
Usually, the server part would be implemented as a Windows service.
A Brief Look at Some of the Code
Objects of the Payload
class is what we are going to send from the client to the server. The class is simple, but not overly so.
[Serializable]
public class Payload
{
private Guid id;
private string text;
private DateTime timeStamp;
private byte[] buffer;
public Payload()
{
}
public Payload(string text, byte bufferFillValue, int bufferSize )
{
id = Guid.NewGuid();
this.text = text;
timeStamp = DateTime.UtcNow;
buffer = new byte[bufferSize];
for (int i = 0; i < bufferSize; i++)
{
buffer[i] = bufferFillValue;
}
}
}
Calling the UI Thread
Most of the interesting stuff happens in worker threads. To interact with components in the UI thread, we use the familiar InvokeRequired
/Invoke
pattern.
private delegate void LogMessageDelegate(string text);
private void LogMessage(string text)
{
if (InvokeRequired)
{
Invoke(new LogMessageDelegate(LogMessage), text);
}
else
{
messageTextBox.AppendText(text + Environment.NewLine);
}
}
Initializing the Message Queue on the Server
In this example, we use the BinaryMessageFormatter
, and rely on the .NET serialization mechanisms – you’ll get a significant speed improvement by rolling your own serialization/deserialization code working directly on byte arrays.
On the server, I like to use asynchronous handling of incoming messages. We assign an event handling method to the ReceiveCompleted
event, and call BeginReceive()
to tell the message queue component to start processing messages and call our OnReceiveCompleted
event handler.
private void InitializeQueue()
{
receivedCounter = 0;
string queuePath = Constants.QueueName;
if (!MessageQueue.Exists(queuePath))
{
messageQueue = MessageQueue.Create(queuePath);
}
else
{
messageQueue = new MessageQueue(queuePath);
}
isRunning = true;
messageQueue.Formatter = new BinaryMessageFormatter();
messageQueue.ReceiveCompleted += OnReceiveCompleted;
messageQueue.BeginReceive();
}
Receiving Messages
Below, you’ll see our OnReceiveCompleted
method that handles incoming messages.
It’s fairly simple, and doesn’t do anything useful with the incoming messages except deserialization, and logging every 10 000th message to the UI thread. The really important point is to call BeginReceive()
to tell the message queue component that we are ready to receive the next message; if we forget this - our server will only handle a single message, not exactly what we had in mind.
private void OnReceiveCompleted(Object source,
ReceiveCompletedEventArgs asyncResult)
{
try
{
MessageQueue mq = (MessageQueue)source;
if (mq != null)
{
try
{
System.Messaging.Message message = null;
try
{
message = mq.EndReceive(asyncResult.AsyncResult);
}
catch (Exception ex)
{
LogMessage(ex.Message);
}
if (message != null)
{
Payload payload = message.Body as Payload;
if (payload != null)
{
receivedCounter++;
if ((receivedCounter % 10000) == 0)
{
string messageText =
string.Format("Received {0} messages", receivedCounter);
LogMessage(messageText);
}
}
}
}
finally
{
if (isRunning)
{
mq.BeginReceive();
}
}
}
return;
}
catch (Exception exc)
{
LogMessage(exc.Message);
}
}
Initializing the Message Queue on the Client
Our client side message queue initialization is even simpler than the server side initialization. Just remember to use the same formatter on both ends – in this case, BinaryMessageFormatter
.
private void InitializeQueue()
{
string queuePath = Constants.QueueName;
if (!MessageQueue.Exists(queuePath))
{
messageQueue = MessageQueue.Create(queuePath);
}
else
{
messageQueue = new MessageQueue(queuePath);
}
messageQueue.Formatter = new BinaryMessageFormatter();
}
Sending Messages
If you want to use multiple threads for sending messages, you need to initialize multiple MessageQueue
components. In this example, we use a single MessageQueue
component, and disable and enable the “Send Messages” button to keep the user from queuing messages from several worker threads at the same time. EnableSend()
is called from the worker thread to enable the “Send Messages” button when the required number of messages has been sent.
private delegate void EnableSendDelegate();
private void EnableSend()
{
if (InvokeRequired)
{
Invoke(new EnableSendDelegate(EnableSend));
}
else
{
sendButton.Enabled = true;
}
}
Here is what we do to actually send a message, if you remove the code for measuring the performance of the function - there really isn't much left. As long as the object is serializable by .NET using the assigned formatter, all we need to do is to call Send
passing an instance of our payload object. It doesn't get much simpler than this. :)
private void SendMessages(int count)
{
Random random = new Random(count);
string message = string.Format("Sending {0} messages",count);
LogMessage(message);
DateTime start = DateTime.Now;
for (int i = 0; i < count; i++)
{
byte b = Convert.ToByte( random.Next(128) );
int size = random.Next(1024);
string text = string.Format("Message: Fill {0} {1}",b,size);
Payload payload = new Payload(text, b, size);
messageQueue.Send(payload);
}
DateTime end = DateTime.Now;
TimeSpan ts = end - start;
message = string.Format("{0} messages sent in {1}", count, ts);
LogMessage(message);
}
Maintain a Responsive UI
Having our UI freeze up while we are dispatching the required number of messages makes users unhappy. By leveraging System.Threading.ThreadPool.QueueUserWorkItem
- we keep our UI responsive in a simple and straightforward manner – again without exactly exerting ourselves. :)
private void AsychSendMessages(object countAsObject)
{
int count = (int)countAsObject;
SendMessages(count);
EnableSend();
}
private void sendButton_Click(object sender, EventArgs e)
{
sendButton.Enabled = false;
int count = Convert.ToInt32( messageCountNumericUpDown.Value );
System.Threading.ThreadPool.QueueUserWorkItem(
new System.Threading.WaitCallback(AsychSendMessages), count);
}
Conclusion
Client/server development doesn’t get much easier than this – and with a few tweaks, the performance can be significantly improved. FastSerializer represents a ready to use, high performance serialization/deserialization solution.
So in the hope that it may prove useful: play around with it.
Microsoft Message Queuing is both efficient and very easy to use for .NET developers. Obviously, this article has just scratched the surface of the technology - but given its performance, I'm pretty sure some of you will find it highly satisfactory.
Best regards,
Espen Harlinn
History
- 12th March, 2011 - Initial posting