Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

A Worker Thread Class For Processing Work Units

4.82/5 (25 votes)
13 Sep 20064 min read 1   735  
Processing work units in a single thread instead of using a thread pool.

Introduction

There are many examples on CodeProject illustrating using thread pooling to manage work or job queues. These all assign the work item to a free thread, and thus the work is done concurrently. I, instead, needed the ability to queue work items that would be processed in a single thread. The typical scenario that I'm using this class for is to process activities that must occur on the WinForm main application thread, such as updating UI elements. Concurrent processing serves no purpose since using Invoke blocks until the main application thread becomes idle.

Architecture

The following sections describe some of the architectural decisions I made in the implementation of this class.

Generics

The class, ProcessingQueue<T>, is a generic class, allowing you to specify the work type as either a value or reference type, for example:

C#
processQueue = new ProcessingQueue<int>();

Of course, this means that each ProcessingQueue<T> instance is restricted to one type of work item, which suits me.

Events and Overrides

There are two events that are called from overridable methods to do the work and handle work exceptions.

Doing the Work

To do the actual work, I implemented the event DoWork, which is called for each work item in the queue. If you would prefer not to use events, you can derive a class from ProcessingQueue<T> and override the OnDoWork method. The work item is contained in the ProcessingQueueEventArgs<T> class.

Handling Exceptions

Exceptions that occur in the worker code (your application) are usually silently caught by a worker thread. To help expose exceptions to the application, you can use the WorkException event, or you can override the OnWorkException method in your own derived class. Both of these pass a ProcessingQueueExceptionEventArgs instance that wraps the Exception instance.

EventWaitHandle vs. Semaphores

I considered using semaphores for this implementation because you can release the semaphore for each work item in the queue, and the thread, which implements WaitOne, will release for the total release counts in the semaphore. However, a semaphore requires a maximum release count, because it's actually designed to release multiple threads, not a single thread. And if you release more than the maximum release count, the semaphore throws an exception, which isn't what I wanted. Since the queue depth is unknown, I didn't want to hardcode some arbitrary upper limit to the semaphore's maximum release count. It's the wrong tool for the job, basically.

So, I chose the simpler EventWaitHandle using automatic reset. The concern here is that work may be queued while a work item is being processed. While this signals the wait event, it does so only once (there's no release count like in a Semaphore), so the worker thread has to process all the work currently in the queue, which also means it needs to check if it was signaled by having work put into the queue, which it processed, and therefore the queue is now empty.

So, the code got complicated enough that I figured a nice generic class to support this feature would be useful, and hence this article. I still can't believe there isn't something similar already here on Code Project. Maybe it's too simple!

Usage

The following sections describe the usage.

Constructor

To create a ProcessingQueue<T> instance for a specific work type, instantiate the class:

C#
processQueue = new ProcessingQueue<int>();

Events

Wire up the work event to a method that will perform the work on the work item, and wire up the exception event if you so desire:

C#
processQueue.DoWork += new ProcessingQueue<int>.DoWorkDlgt(OnDoWork);
processQueue.WorkException += 
     new ProcessingQueue<int>.WorkExceptionDlgt(OnWorkException);

Queuing Work

Queuing work is straightforward--call the QueueForWork method:

C#
processQueue.QueueForWork(1);

Stopping the Thread

To exit the thread waiting for work, call the Stop method:

C#
processQueue.Stop();

This is a non-blocking call, and it will also finish any remaining work in the queue before the work thread terminates.

Implementation

Here's the worker thread. It should be pretty straightforward from the comments and my description above as to what's going on.

C#
protected void ProcessQueueWork()
{
  while (!stop)
  {
    // Wait for some work.
    waitProcess.WaitOne();
    bool haveWork;

    // Finish remaining work before stopping.
    do
    {
      // Initialize to the default work value.
      T work = default(T);
      // Assume no work.
      haveWork = false;<BR>
      // Prevent enqueing from a different thread.
      lock (workQueue)
      {
        // Do we have work? This might be 0 if stopping or if all 
        // work is processed.
        if (workQueue.Count > 0)
        {
          // Get the work.
          work = workQueue.Dequeue();
          // Yes, we have work.
          haveWork = true;
        }
      }

      // If we have work...
      if (haveWork)
      {
        try
        {
          // Try processing it.
          OnDoWork(new ProcessingQueueEventArgs<T>(work));
        }
        catch (Exception e)
        {
          // Oops, inform application of a work error.
          OnWorkException(new ProcessingQueueExceptionEventArgs(e));
        }
      }

    } while (haveWork); // continue processing if there was work.
  }
}

Unit Tests

I wrote a couple really simple unit tests to verify the functionality, but certainly isn't a rigorous test:

C#
[TestFixture]
public class ProcessThreadTests
{
  protected ProcessingQueue<int> processQueue;
  protected bool exceptionRaised;
  protected bool workRaised;

  [TestFixtureSetUp]
  public void FixtureSetup()
  {
    processQueue = new ProcessingQueue<int>();
    processQueue.DoWork += new ProcessingQueue<int>.DoWorkDlgt(OnDoWork);
    processQueue.WorkException += 
         new ProcessingQueue<int>.WorkExceptionDlgt(OnWorkException);
  }

  [Test]
  public void QueueWorkTest()
  {
    processQueue.QueueForWork(1);
    while (!workRaised) { }
  }

  [Test]
  public void WorkExceptionTest()
  {
    processQueue.QueueForWork(2);
    while (!exceptionRaised) { }
  }

  void OnDoWork(object sender, ProcessingQueueEventArgs<int> args)
  {
    switch (args.Work)
    {
      case 1:
        workRaised = true;
        break;
      case 2:
        throw new ApplicationException("Exception");
    }
  }

  void OnWorkException(object sender, ProcessingQueueExceptionEventArgs args)
  {
    exceptionRaised = true;
  }
}

Conclusion

Hopefully, you will find this class useful and my implementation without error!

History

Updated on 9/13 as I discovered I wasn't clearing a flag, and the code never returned to the WaitOne instruction!  Fixing this also eliminated one of the flags.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here