Introduction
Message Queuing is an effective and efficient technique for inter process or even inter thread communication. Microsoft’s MSMQ is a very good implementation of Message Queuing technology. However, MSMQ is obviously not the desired choice when you need a simple Message Queue data structure that will be used by multiple threads within single application. Also, MSMQ is very slow for real-time distributed applications for e.g. a socket reads UDP multicast messages that need to be processed in a different thread with as soon as possible. MSMQ can’t be the best choice here.
This article demonstrates the techniques for implementing a Message Queue data structure using counting semaphores.
Background
Some basic ideas of wait handle, local semaphore and threads are recommended to understand the code.
Using the Code
Source code of the MessageQueue
class is shown below:
using System;
using System.Collections.Generic;
using System.Threading;
public class MessageQueue<T> : IDisposable
{
private readonly int _QUEUE_SIZE;
private Semaphore _semaphore;
private Queue<T> _internalQueue;
private object _syncLock;
public MessageQueue(int queueSize)
{
_syncLock = new object();
_QUEUE_SIZE = queueSize;
_internalQueue = new Queue<T>(_QUEUE_SIZE);
_semaphore = new Semaphore(0, _QUEUE_SIZE);
}
public void Reset()
{
_semaphore = new Semaphore(0, _QUEUE_SIZE);
_internalQueue.Clear();
}
public void EnqueueMessage(T message)
{
lock (_syncLock)
{
if (_semaphore != null && message != null)
{
try
{
_semaphore.Release();
_internalQueue.Enqueue(message);
}
catch { }
}
}
}
public T DequeueMessage()
{
_semaphore.WaitOne();
lock (_syncLock)
{
T message = _internalQueue.Dequeue();
return message;
}
}
public void Dispose()
{
if (_semaphore != null)
{
_semaphore.Close();
_semaphore = null;
}
_internalQueue.Clear();
}
}
The following program demonstrates the usage of this class:
using System;
using System.Threading;
namespace ConsoleApplication1
{
public class Program
{
static int __receivedCount = 0;
static MessageQueue<object> __messageQueue = new MessageQueue<object>(1000);
public static void Main()
{
Thread dequeueThread = new Thread(new ThreadStart(_doDequeue));
dequeueThread.Start();
int messageCount = 10000000;
for (int i = 0; i < messageCount; i++)
{
__messageQueue.EnqueueMessage(new object());
Console.SetCursorPosition(30, 10);
Console.WriteLine((i + 1).ToString().PadLeft(10) + " of " +
messageCount.ToString());
}
Console.WriteLine("Press any key...");
Console.Read();
int receivedPercent = (int)((__receivedCount / (double)messageCount) * 100);
Console.WriteLine(
"{0} out of {1} message received.\nReceived = {2}%\nLost = {3}%",
__receivedCount, messageCount, receivedPercent, (100 - receivedPercent));
Console.Read();
try
{
dequeueThread.Abort();
}
catch { }
__messageQueue.Dispose();
}
static void _doDequeue()
{
while (true)
{
object message = __messageQueue.DequeueMessage();
__receivedCount++;
}
}
}
}
Points of Interest
With this implementation of the MessageQueue
, the internal queue size will never infinitely increase which could cause OutOfMemoryException
. Additionally the using of semaphore will take the queue to sleeping state and will not consume any CPU cycles when there is no message available to be dequeued.
History
- 26th June, 2009: Initial post