Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

MSMQ Backed FIFO Queue (C# .NET)

0.00/5 (No votes)
20 May 2011CPOL 12.9K  
A standard in-memory generic .NET queue that, when the configurable overflow limit is reached, will start using MSMQ as a backend to mitigate the issues with memory usage

Having a requirement for a queue that is both high-performance when needed and can handle a large amount of entries without blowing my system's memory out of the water, I created the following utility queue.

In short, it is a standard in-memory generic .NET queue that, when the configurable overflow limit is reached, will start using MSMQ as a backend to mitigate the issues with memory usage. When used in a low volume scenario, the queue will act like a normal generic queue.

If you have any questions or comments, feel free to email me.

Also, see my article on CodeProject here.

C#
public class OverflowQueue<T> where T : class
{
     Queue<T> internalQueue = new Queue<T>();

     BinaryMessageFormatter messageFormatter = new BinaryMessageFormatter();

     internal static int compactOverflowQueue = 
       AppConfigHelper.GetValue("CompactOverflowQueueAfterMessagesDequeued", 10000);
     internal static int maxInternalQueueSize = 
       AppConfigHelper.GetValue("MaxInternalOverflowQueueSize", 10000);

     QueuePath queuePath;

     MessageQueue overflowMSMQ;

     long currentQueueSize;
     long currentMSMQSize;

     bool useBackgroundMsmqPull;
     Thread backgroundMsmqPullThread;

     volatile bool pushingToMSMQ;

     object msmqPullLock = new object();
     object msmqLock = new object();

     int itemDequeued;

     public long CurrentMemoryQueueSize
     {
          get
          {
               return currentQueueSize;
          }
     }

     public long CurrentMSMQSize
     {
          get
          {
               return Interlocked.Read(ref currentMSMQSize);
          }
     }

     public long TotalCount
     {
          get
          {
               return Interlocked.Read(ref currentMSMQSize) + 
                 Interlocked.Read(ref currentQueueSize);
          }
     }

     public OverflowQueue(string queueName, bool purgeQueue, bool useBackgroundMsmqPull)
     {
          queuePath = new QueuePath(queueName, Environment.MachineName, false);
          overflowMSMQ = QueueCreator.CreateAndReturnQueue(queuePath);
          overflowMSMQ.Formatter = new BinaryMessageFormatter();

          if (purgeQueue)
               overflowMSMQ.Purge();

          this.useBackgroundMsmqPull = useBackgroundMsmqPull;

          if (useBackgroundMsmqPull)
          {
               // start a thread that pulls msmq messages back into the queue 
               backgroundMsmqPullThread = new Thread(BackgroundMsmqPull);
               backgroundMsmqPullThread.IsBackground = true;
               backgroundMsmqPullThread.Name = "BackgroundMsmqPullThread";
               backgroundMsmqPullThread.Start();
          }
     }

     void BackgroundMsmqPull()
     {
          while (true)
          {
               PullFromMSMQ();
               Thread.Sleep(1000);
          }
     }

     public void Enqueue(T item)
     {
          if (!pushingToMSMQ)
          {
               if (currentQueueSize >= maxInternalQueueSize)
               {
                    // We've busted the queue size... 
                    // start pushing to MSMQ until the current
                    // queue zeros out,
                    // then pop all entries (up to max) back into memory.
                    lock (msmqLock)
                    {
                         pushingToMSMQ = true;
                         PushToMSMQ(item);
                    }
               }
               else
               {
                    // We're still pushing into the memory queue
                    PushToMemoryQueue(item);
               }
          }
          else
          {
               // This lock looks like this (split if) because
               // I don't wnat to hold onto the lock for a push to the memory queue
               lock (msmqLock)
               {
                    if (pushingToMSMQ) // verify we're still pushing to MSMQ
                    {
                         PushToMSMQ(item);
                         return; // Skip the push to memory queue
                    }
               }

               // This will only get hit if we aren't still pushing to MSMQ
               PushToMemoryQueue(item);
          }
     }

     void PushToMemoryQueue(T item)
     {
          lock (internalQueue)
          internalQueue.Enqueue(item);

          Interlocked.Increment(ref currentQueueSize);
     }

     void PushToMSMQ(T item)
     {
          Message message = new Message(item, messageFormatter);
          overflowMSMQ.Send(message);
          Interlocked.Increment(ref currentMSMQSize);
     }

     public T Dequeue()
     {
          try
          {
               if (!useBackgroundMsmqPull)
               {
                    PullFromMSMQ();
               }
               else
               {
                    // This is here because if the background pull
                    // is on and a user tries to dequeue too quickly, 
                    // they can run up against an empty memory queue,
                    // but there could still be something in the MSMQ.
                    // So, we need to double check for them ;)

                    if (Interlocked.Read(ref currentQueueSize) == 0 
                         && Interlocked.Read(ref currentMSMQSize) > 0)
                    PullFromMSMQ();
               }

               T item = null;

               if (Interlocked.Read(ref currentQueueSize) > 0)
               {
                    lock(internalQueue)
                    item = internalQueue.Dequeue();
               }

               if (item != null)
               {
                    Interlocked.Increment(ref itemDequeued);
                    Interlocked.Decrement(ref currentQueueSize);

                    return item;
               }
               else
               {
                    throw new Exception("Nothing to dequeue!");
               }
          }
          finally
          {
               if (itemDequeued >= compactOverflowQueue)
               {
                    lock (internalQueue)
                    {                      
                         if (itemDequeued >= compactOverflowQueue)
                         {
                              // Compact the internal queue to save space
                              internalQueue.TrimExcess();
                              itemDequeued = 0;
                         }
                    }
               }
          }
     }

     void PullFromMSMQ()
     {
          // We've been putting all new messages into MSMQ...
          // now is the time to get them out and put them back into memory.
          while (Interlocked.Read(ref currentMSMQSize) > 0 && 
                 Interlocked.Read(ref currentQueueSize) < maxInternalQueueSize)
                 // currentQueueSize should be low here
          {
               Message message = overflowMSMQ.Receive();

               // decrement the MSMQ size
               Interlocked.Decrement(ref currentMSMQSize);

               T item = message.Body as T;
               PushToMemoryQueue(item);
          }
          if (Interlocked.Read(ref currentMSMQSize) <= 0)
          {
               lock (msmqLock)
               // lock around this to prevent the count
               // from being wrong when turning off msmq pushing
               {
                    if (Interlocked.Read(ref currentMSMQSize) <= 0)
                         pushingToMSMQ = false;
               }
          }
     }
}

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)