Having a requirement for a queue that is both high-performance when needed and can handle a large amount of entries without blowing my system's memory out of the water, I created the following utility queue.
In short, it is a standard in-memory generic .NET queue that, when the configurable overflow limit is reached, will start using MSMQ as a backend to mitigate the issues with memory usage. When used in a low volume scenario, the queue will act like a normal generic queue.
If you have any questions or comments, feel free to email me.
Also, see my article on CodeProject here.
public class OverflowQueue<T> where T : class
{
Queue<T> internalQueue = new Queue<T>();
BinaryMessageFormatter messageFormatter = new BinaryMessageFormatter();
internal static int compactOverflowQueue =
AppConfigHelper.GetValue("CompactOverflowQueueAfterMessagesDequeued", 10000);
internal static int maxInternalQueueSize =
AppConfigHelper.GetValue("MaxInternalOverflowQueueSize", 10000);
QueuePath queuePath;
MessageQueue overflowMSMQ;
long currentQueueSize;
long currentMSMQSize;
bool useBackgroundMsmqPull;
Thread backgroundMsmqPullThread;
volatile bool pushingToMSMQ;
object msmqPullLock = new object();
object msmqLock = new object();
int itemDequeued;
public long CurrentMemoryQueueSize
{
get
{
return currentQueueSize;
}
}
public long CurrentMSMQSize
{
get
{
return Interlocked.Read(ref currentMSMQSize);
}
}
public long TotalCount
{
get
{
return Interlocked.Read(ref currentMSMQSize) +
Interlocked.Read(ref currentQueueSize);
}
}
public OverflowQueue(string queueName, bool purgeQueue, bool useBackgroundMsmqPull)
{
queuePath = new QueuePath(queueName, Environment.MachineName, false);
overflowMSMQ = QueueCreator.CreateAndReturnQueue(queuePath);
overflowMSMQ.Formatter = new BinaryMessageFormatter();
if (purgeQueue)
overflowMSMQ.Purge();
this.useBackgroundMsmqPull = useBackgroundMsmqPull;
if (useBackgroundMsmqPull)
{
backgroundMsmqPullThread = new Thread(BackgroundMsmqPull);
backgroundMsmqPullThread.IsBackground = true;
backgroundMsmqPullThread.Name = "BackgroundMsmqPullThread";
backgroundMsmqPullThread.Start();
}
}
void BackgroundMsmqPull()
{
while (true)
{
PullFromMSMQ();
Thread.Sleep(1000);
}
}
public void Enqueue(T item)
{
if (!pushingToMSMQ)
{
if (currentQueueSize >= maxInternalQueueSize)
{
lock (msmqLock)
{
pushingToMSMQ = true;
PushToMSMQ(item);
}
}
else
{
PushToMemoryQueue(item);
}
}
else
{
lock (msmqLock)
{
if (pushingToMSMQ)
{
PushToMSMQ(item);
return;
}
}
PushToMemoryQueue(item);
}
}
void PushToMemoryQueue(T item)
{
lock (internalQueue)
internalQueue.Enqueue(item);
Interlocked.Increment(ref currentQueueSize);
}
void PushToMSMQ(T item)
{
Message message = new Message(item, messageFormatter);
overflowMSMQ.Send(message);
Interlocked.Increment(ref currentMSMQSize);
}
public T Dequeue()
{
try
{
if (!useBackgroundMsmqPull)
{
PullFromMSMQ();
}
else
{
if (Interlocked.Read(ref currentQueueSize) == 0
&& Interlocked.Read(ref currentMSMQSize) > 0)
PullFromMSMQ();
}
T item = null;
if (Interlocked.Read(ref currentQueueSize) > 0)
{
lock(internalQueue)
item = internalQueue.Dequeue();
}
if (item != null)
{
Interlocked.Increment(ref itemDequeued);
Interlocked.Decrement(ref currentQueueSize);
return item;
}
else
{
throw new Exception("Nothing to dequeue!");
}
}
finally
{
if (itemDequeued >= compactOverflowQueue)
{
lock (internalQueue)
{
if (itemDequeued >= compactOverflowQueue)
{
internalQueue.TrimExcess();
itemDequeued = 0;
}
}
}
}
}
void PullFromMSMQ()
{
while (Interlocked.Read(ref currentMSMQSize) > 0 &&
Interlocked.Read(ref currentQueueSize) < maxInternalQueueSize)
{
Message message = overflowMSMQ.Receive();
Interlocked.Decrement(ref currentMSMQSize);
T item = message.Body as T;
PushToMemoryQueue(item);
}
if (Interlocked.Read(ref currentMSMQSize) <= 0)
{
lock (msmqLock)
{
if (Interlocked.Read(ref currentMSMQSize) <= 0)
pushingToMSMQ = false;
}
}
}
}