Introduction
There are many examples on CodeProject illustrating using thread pooling to manage work or job queues. These all assign the work item to a free thread, and thus the work is done concurrently. I, instead, needed the ability to queue work items that would be processed in a single thread. The typical scenario that I'm using this class for is to process activities that must occur on the WinForm main application thread, such as updating UI elements. Concurrent processing serves no purpose since using Invoke blocks until the main application thread becomes idle.
Architecture
The following sections describe some of the architectural decisions I made in the implementation of this class.
Generics
The class, ProcessingQueue<T>
, is a generic class, allowing you to specify the work type as either a value or reference type, for example:
processQueue = new ProcessingQueue<int>();
Of course, this means that each ProcessingQueue<T>
instance is restricted to one type of work item, which suits me.
Events and Overrides
There are two events that are called from overridable methods to do the work and handle work exceptions.
Doing the Work
To do the actual work, I implemented the event DoWork
, which is called for each work item in the queue. If you would prefer not to use events, you can derive a class from ProcessingQueue<T>
and override the OnDoWork
method. The work item is contained in the ProcessingQueueEventArgs<T>
class.
Handling Exceptions
Exceptions that occur in the worker code (your application) are usually silently caught by a worker thread. To help expose exceptions to the application, you can use the WorkException
event, or you can override the OnWorkException
method in your own derived class. Both of these pass a ProcessingQueueExceptionEventArgs
instance that wraps the Exception
instance.
EventWaitHandle vs. Semaphores
I considered using semaphores for this implementation because you can release the semaphore for each work item in the queue, and the thread, which implements WaitOne
, will release for the total release counts in the semaphore. However, a semaphore requires a maximum release count, because it's actually designed to release multiple threads, not a single thread. And if you release more than the maximum release count, the semaphore throws an exception, which isn't what I wanted. Since the queue depth is unknown, I didn't want to hardcode some arbitrary upper limit to the semaphore's maximum release count. It's the wrong tool for the job, basically.
So, I chose the simpler EventWaitHandle
using automatic reset. The concern here is that work may be queued while a work item is being processed. While this signals the wait event, it does so only once (there's no release count like in a Semaphore), so the worker thread has to process all the work currently in the queue, which also means it needs to check if it was signaled by having work put into the queue, which it processed, and therefore the queue is now empty.
So, the code got complicated enough that I figured a nice generic class to support this feature would be useful, and hence this article. I still can't believe there isn't something similar already here on Code Project. Maybe it's too simple!
Usage
The following sections describe the usage.
Constructor
To create a ProcessingQueue<T>
instance for a specific work type, instantiate the class:
processQueue = new ProcessingQueue<int>();
Events
Wire up the work event to a method that will perform the work on the work item, and wire up the exception event if you so desire:
processQueue.DoWork += new ProcessingQueue<int>.DoWorkDlgt(OnDoWork);
processQueue.WorkException +=
new ProcessingQueue<int>.WorkExceptionDlgt(OnWorkException);
Queuing Work
Queuing work is straightforward--call the QueueForWork
method:
processQueue.QueueForWork(1);
Stopping the Thread
To exit the thread waiting for work, call the Stop
method:
processQueue.Stop();
This is a non-blocking call, and it will also finish any remaining work in the queue before the work thread terminates.
Implementation
Here's the worker thread. It should be pretty straightforward from the comments and my description above as to what's going on.
protected void ProcessQueueWork()
{
while (!stop)
{
waitProcess.WaitOne();
bool haveWork;
do
{
T work = default(T);
haveWork = false;<BR>
lock (workQueue)
{
if (workQueue.Count > 0)
{
work = workQueue.Dequeue();
haveWork = true;
}
}
if (haveWork)
{
try
{
OnDoWork(new ProcessingQueueEventArgs<T>(work));
}
catch (Exception e)
{
OnWorkException(new ProcessingQueueExceptionEventArgs(e));
}
}
} while (haveWork);
}
}
Unit Tests
I wrote a couple really simple unit tests to verify the functionality, but certainly isn't a rigorous test:
[TestFixture]
public class ProcessThreadTests
{
protected ProcessingQueue<int> processQueue;
protected bool exceptionRaised;
protected bool workRaised;
[TestFixtureSetUp]
public void FixtureSetup()
{
processQueue = new ProcessingQueue<int>();
processQueue.DoWork += new ProcessingQueue<int>.DoWorkDlgt(OnDoWork);
processQueue.WorkException +=
new ProcessingQueue<int>.WorkExceptionDlgt(OnWorkException);
}
[Test]
public void QueueWorkTest()
{
processQueue.QueueForWork(1);
while (!workRaised) { }
}
[Test]
public void WorkExceptionTest()
{
processQueue.QueueForWork(2);
while (!exceptionRaised) { }
}
void OnDoWork(object sender, ProcessingQueueEventArgs<int> args)
{
switch (args.Work)
{
case 1:
workRaised = true;
break;
case 2:
throw new ApplicationException("Exception");
}
}
void OnWorkException(object sender, ProcessingQueueExceptionEventArgs args)
{
exceptionRaised = true;
}
}
Conclusion
Hopefully, you will find this class useful and my implementation without error!
History
Updated on 9/13 as I discovered I wasn't clearing a flag, and the code never returned to the WaitOne instruction! Fixing this also eliminated one of the flags.