Last year, I was working on a cloud-hosted Windows service for a client that contained an application-specific logging implementation. The existing architecture had log entries posted at various process points, i.e., file discovery, pickup, dropoff, and download. The log code would post a message to the Microsoft Messaging Queueing service (MSMQ) and a separate database writer service would dequeue those messages and post them to a series of tables in SQL Server.
Lagging The Play
While this setup worked perfectly well, it had one minor issue - the queueing of a log message to MSMQ happened sequentially. That means that while the service was attempting to post a log message to the queue, all other file processing was temporarily suspended. Since posting a log message to MSMQ means you're performing an inter-process communication, there will be a noticeable lag imposed on the calling thread. Add to that the possibility that the MSMQ service could be located on another server and you've now imposed network lag time on the calling process as well. That's potentially alotta-lag! In the worst possible case, if MSMQ cannot be reached for some reason, file processing could be suspended for a very long time. For a platform that expects to be able to process thousands of messages a day, this was clearly not going to work as a long term solution. However, the client wanted to retain the use of MSMQ as a persistent message forwarding mechanism so that if the writer service was unavailable, the log messages would not end up getting lost.
Block For Me
It seemed clear that what was needed was some way for the service to save log messages internally for near-term posting to MSMQ in a way that would minimally impact file processing. What came to mind initially was to have an internal Queue object on which the service could store log messages that could be dequeued and posted to MSMQ by another thread. It's a classic Producer-Consumer pattern. While this is a threading implementation that is not of surpassing difficulty to implement it has some subtleties that make it non-trivial. First, all access to the Queue
object has to be thread-safe. Second, the MSMQ posting thread needs to enter a low-CPU-load no-operation loop while it's waiting for a log message to be queued. Wouldn't it be nice if there was something built into the .NET Framework to do all this?
Well, sometimes Microsoft gets it right. In the .NET Framework 4 release, Microsoft added something called a Blocking Collection that does exactly what we needed. It allows for thread-safe Producer-Consumer patterns that do not consume CPU resources when there is nothing on the queue.
Here's an example of how to implement it in a simple console application.
First, we'll need a message
class. In the service for the client, the log information message was more complex, but this should give you the general idea.
namespace BlockingCollectionExample
{
class MyMessage
{
public int MessageId { get; set; }
public string Message { get; set; }
public string ToString()
{
return string.Format("Message with ID {0:#,##0} and value {1}.", MessageId, Message);
}
}
}
The real "meat" of the operation is in the class that encapsulates the blocking collection. Here's the first portion of the class definition.
using System.Collections.Concurrent;
using System.Threading;
namespace BlockingCollectionExample
{
class MyQueue : IDisposable
{
private BlockingCollection<MyMessage> messageQueue;
private Thread dequeueThread;
bool stopped = false;
bool isStopping = false;
public MyQueue()
{
messageQueue = new BlockingCollection<MyMessage>(new ConcurrentQueue<MyMessage>());
dequeueThread = new Thread(new ThreadStart(zDequeueMessageThread));
dequeueThread.Name = "TransactionPostThread";
dequeueThread.Start();
stopped = false;
}
~MyQueue()
{
Dispose(true);
}
...
You'll notice that the class implements the IDisposable
interface. This is so that the thread that dequeues the messages from the blocking collection can clean up after itself. This will be seen in another section of the code for this class.
You'll also notice that when the BlockingCollection
is defined, we specify the class of objects that will be placed on the collection. However, when we instantiate the collection, we signify that it should use a ConcurrentQueue
object as the backing data store for the blocking collection. This ensures that the items placed in the collection will be handled in a thread-safe manner on a first-in, first-out (FIFO) basis.
The finalizer method merely calls our Dispose
method with a parameter indicating that this was called from the class' destructor, a common patterm for IDisposable implementations. The Dispose
methods will be shown in their entirety later in this post.
public void AddLog(MyMessage message)
{
Console.WriteLine("Enqueueing: " + message.ToString());
messageQueue.Add(message);
}
private void DequeueMessageThread()
{
try
{
while (true)
{
MyMessage message = messageQueue.Take();
Console.WriteLine("Dequeueing: " + message.ToString());
if (messageQueue.IsCompleted)
{
break;
}
}
}
catch (InvalidOperationException)
{
}
catch (ThreadAbortException)
{
}
catch (Exception)
{
throw;
}
}
...
The AddLog
method is very simple; it invokes the blocking collection's Add
method to enqueue the message in a thread safe manner. The DequeueMessageThread
method appears to be an endless loop that keeps attempting to dequeue a message, causing a CPU spike from the tight looping. But here's where the magic of the blocking collection comes into play. The Take
method of the blocking collection will enter into a low-CPU wait state if nothing is found on the queue, blocking the loop from proceeding. As soon as a message is enqueued, the Take
method will return from the wait
state and the loop will proceed. Note that the Take
method will also return immediately if the blocking collection has been closed down, indicating completion, hence the IsCompleted
check right after the call.
The exception handler in the method captures two specific exceptions:
- The
InvalidOperationException
will be signaled if the blocking collection is stopped. We'll see this in the Dispose
method. - The
ThreadAbortException
will be signaled if the thread had to be killed because the Dispose
method timed out waiting for the thread to finish.
public void Dispose()
{
Dispose(false);
}
private void Dispose(bool fromDestructor)
{
isStopping = true;
int logShutdownTimeout = 30000;
Console.WriteLine("Shutting down queue. Waiting for dequeue thread completion.");
messageQueue.CompleteAdding();
do
{
if (!dequeueThread.Join(logShutdownTimeout))
{
if (messageQueue.Count == 0)
{
System.Diagnostics.Debug.Print("Aborting thread");
dequeueThread.Abort();
break;
}
}
} while (dequeueThread.IsAlive);
Console.WriteLine("Dequeue thread complete.");
if (!fromDestructor)
{
GC.SuppressFinalize(this);
}
stopped = true;
isStopping = false;
}
In this code snippet, the first Dispose
method is our public
interface that satisfies the requirement for IDisposable
implementation. It simply calls our private Dispose
method that takes a parameter indicating whether it was called from the class destructor method.
The second private Dispose
method is where some housekeeping for the blocking collection and dequeue thread happens. First, we call the blocking collection's CompleteAdding
method. This will disallow any further additions to the queue, minimizing the chance that the dequeue thread will never end because messages continue to be added. We then attempt to wait for the thread to complete by calling the thread's Join
method, specifying a timeout value for the thread. If the thread is not complete within the specified timeout, we forcibly destroy it and exit. Finally, if called from the class' destructor, we can suppress the finalize
method of the garbage collector.
To utilize a producer-consumer queue like this one is quite simple:
class Program
{
static void Main(string[] args)
{
using (MyQueue queue = new MyQueue())
{
for (int msgIdx = 1; msgIdx < 101; msgIdx++)
{
queue.AddLog(new MyMessage
{
MessageId = msgIdx,
Message = string.Format("Message text # {0:#,##0}", msgIdx)
});
}
}
}
}
The using
statement ensures that the queue's Dispose
method is invoked upon completion, thereby stopping the dequeing thread. When executed in a loop like this one that enqueues 100 messages, the tail end of the output looks like this:
Enqueueing: Message with ID 92 and value Message text # 92.
Enqueueing: Message with ID 93 and value Message text # 93.
Enqueueing: Message with ID 94 and value Message text # 94.
Dequeueing: Message with ID 88 and value Message text # 88.
Dequeueing: Message with ID 89 and value Message text # 89.
Dequeueing: Message with ID 90 and value Message text # 90.
Dequeueing: Message with ID 91 and value Message text # 91.
Enqueueing: Message with ID 95 and value Message text # 95.
Enqueueing: Message with ID 96 and value Message text # 96.
Enqueueing: Message with ID 97 and value Message text # 97.
Enqueueing: Message with ID 98 and value Message text # 98.
Dequeueing: Message with ID 92 and value Message text # 92.
Dequeueing: Message with ID 93 and value Message text # 93.
Dequeueing: Message with ID 94 and value Message text # 94.
Dequeueing: Message with ID 95 and value Message text # 95.
Enqueueing: Message with ID 99 and value Message text # 99.
Enqueueing: Message with ID 100 and value Message text # 100.
Dequeueing: Message with ID 96 and value Message text # 96.
Dequeueing: Message with ID 97 and value Message text # 97.
Dequeueing: Message with ID 98 and value Message text # 98.
Dequeueing: Message with ID 99 and value Message text # 99.
Dequeueing: Message with ID 100 and value Message text # 100.
Shutting down queue. Waiting for dequeue thread completion.
Dequeue thread complete.
As you can see, the dequeue process slightly lags the enqueue process, as you would expect for processes running in separate threads. The messages are interspersed as the threads compete for the shared resource.
Finishing It Off
So what we've demonstrated is a way to implement a producer-consumer pattern without writing a lot of thread management code. While this pattern is not applicable in a great many situations, it certainly has its uses. Any time you need to queue up items for processing but don't want to slow down the primary process, give this pattern a try.