Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / programming / threads

Processing a Queue of Objects using Multiple Threads with the ProcessQueue

4.57/5 (8 votes)
3 Dec 2009CPOL7 min read 63.9K   1.8K  
The ProcessQueue manages a pool of threads to process a strongly-typed queue of items.

Introduction

A user on StackOverflow recently asked a question about processing items from a Queue<T> using multiple threads. While a general solution to this problem is to enqueue these operations using the .NET ThreadPool mechanism, often this sort of task can scale itself out of being a feasible candidate for using the ThreadPool fairly easily; too many operations enqueued at once can starve the ThreadPool, and operations that are particularly long-running can monopolize a ThreadPool thread. For this reason, I set out to develop a thread-safe (through synchronization) class that operates in the same manner as the built-in Queue<T>, but allows the user to specify an operation on a per-item basis and would schedule these operations using its own user-adjustable pool of threads. Enter: the ProcessQueue<T>.

As with the FactoryDictionary, a healthy portion of the code contained in the ProcessQueue is basically a synchronized pass-thru to the underlying Queue<T> class, so I won't cover that here. I'll be focusing on the management of the threads that the ProcessQueue creates and how they relate to the user's action.

Pooling the Threads

By default, the ProcessQueue starts out with one worker thread. This is suitable for asynchronous processing of the items in the ProcessQueue when the time to complete is not as important as conserving resources, or when it’s essential that items in the ProcessQueue be processed in the same order as they were entered.

Note: I want to point out an important caveat that may have slipped in with that last sentence: while the ProcessQueue itself is a queue (and using it as such with the Enqueue and Dequeue functions will behave as expected), there is no guarantee that when item X is added before item Y that item X will complete processing before item Y when multiple worker threads are in use. If order of completion must reflect the order of the queue, then only a single worker thread must be used.

Because the ProcessQueue needs to be able to suspend, resume, and terminate its worker threads, the WorkerThread class wraps the management of the WaitHandle objects that coordinate this functionality. The WorkerThread maintains a ManualResetEvent that signals whether or not it should actively check the ProcessQueue for an item, and another ManualResetEvent that signals the thread to abort. The relevant code appears below:

C#
private ManualResetEvent abortEvent;
private ManualResetEvent signalEvent; 
private Thread thread;

public void Start()
{
    thread.Start();
}

public void Abort()
{
    abortEvent.Set();

    thread.Join();
}

public void Pause()
{
    signalEvent.Reset();
}

public void Signal()
{
    signalEvent.Set();
}

The ProcessQueue class uses the functions listed here to manage the threads:

  • Start is called when the WorkerThread is created
  • Signal is called when the thread needs to "wake up", such as when items are added to the queue or the number of threads is increased and the thread needs to start processing immediately.
  • Pause is called only if the user manually signals the ProcessQueue to pause all processing (the function is called on all WorkerThread objects at that time).
  • Abort is called when the ProcessQueue is disposed or when the number of worker threads is reduced by calling SetThreadCount.

The body of the thread's procedure appears below:

C#
private void ThreadProc()
{
    WaitHandle[] handles = new WaitHandle[] { signalEvent, abortEvent };

    while (true)
    {
        switch (WaitHandle.WaitAny(handles))
        {
            case 0: // signal
                {
                    ProcessItems();
                } break;
            case 1: // abort
                {
                    return;
                }
        }
    }
}

This it fairly typical infinite loop thread body which waits on either of the two WaitHandles to be set before it proceeds. If the signal handle was set, then it processes the current items in the queue then resets the signal handle and waits for it to be set again. If the abort handle is set, it returns from the thread body and the thread terminates.

C#
private void ProcessItems()
{
    T item;

    while (queue.TryDequeue(out item))
    {
        queue.ProcessItem(item);

        if (!signalEvent.WaitOne(0) || abortEvent.WaitOne(0)) return;
    }

    signalEvent.Reset();
}

This introduces something from the ProcessQueue class: the TryDequeue function. If you're familiar with the various TryX functions in the .NET Framework (TryParse on various value types, TryGetValue, etc.) then its purpose should be fairly clear, but I will cover it in the upcoming section.

One advantage to using the WorkerThread class is that the thread can put itself to sleep once processing is finished, rather than requiring the ProcessQueue to monitor calls to Dequeue to determine when to suspend a thread. In fact, the ProcessQueue class simply maintains a List<WorkerThread> internally and activates a thread when needed.

Process Management

From the end-user's perspective, Start, Stop, and SetThreadCount are the only calls necessary to control the processing of items in the ProcessQueue.

  • Start triggers the processing of all items currently in the queue and any that are added afterwards.
  • Stop pauses the processing of any unprocessed items. Calling Pause will not suspend any active processes, but it will prevent any completely unprocessed items (or items added after the call) from being processed.
  • SetThreadCount does exactly what it sounds like; it sets the number of threads that the ProcessQueue maintains for performing concurrent processes. Note that at least one thread must be present in order for any work to happen, so values that are less than 1 will throw an ArgumentOutOfRangeException.

From the ProcessQueue's perspective, all it ever does is activate individual threads or pause all threads. It never has a need to pause an individual thread. It will activate threads when:

  • A new item is added to the queue (and there are inactive threads)
  • Processing is resumed after being paused (or started for the first time) and there are unprocessed items
  • SetThreadCount was called to increase the size of the thread pool and there are unprocessed items
C#
public void Enqueue(T item)
{
    lock (syncRoot)
    {
        queue.Enqueue(item);

        if (isRunning)
        {
            RegenerateIfDisposed();

            WorkerThread firstThread = threads.Where(t => !t.IsSignaled).FirstOrDefault();

            if (firstThread != null) firstThread.Signal();
        }
    }
}

public void Start()
{
    lock (syncRoot)
    {
        RegenerateIfDisposed();

        for (int i = 0; i < Math.Min(threads.Count, queue.Count); i++)
        {
            threads[i].Signal();
        }

        isRunning = true;
    }
}

public void SetThreadCount(int threadCount)
{
    if (threadCount < 1) 
        throw new ArgumentOutOfRangeException("threadCount", 
            "The ProcessQueue class requires at least one worker thread.");

    lock (syncRoot)
    {
        int pending = queue.Count;

        for (int i = threads.Count; i < threadCount; i++) // add additional threads
        {
            WorkerThread thread = new ProcessQueue<t>.WorkerThread(this);

            threads.Add(thread);

            thread.Start();

            if (pending> 1) 
            {
                thread.Signal();
            }

            pending--;
        }

        int toRemove = threads.Count - threadCount;

        if (toRemove > 0)
        {
            foreach (WorkerThread thread in threads.Where(t => !t.IsSignaled).ToList())
            {
                thread.Abort();
                threads.Remove(thread);

                toRemove--;
            }

            while (toRemove > 0)
            {
                WorkerThread thread = threads[threads.Count - 1];

                thread.Abort();

                threads.Remove(thread);

                toRemove--;
            }
        }
    }
}

Note here that the SetThreadCount function will add or remove threads as necessary, and if any unprocessed items exist while threads are being added, then the appropriate number of threads will be signaled. If the number of threads is being decreased, then unsignaled threads are removed first, then signaled threads are removed until the desired number of threads are still active.

As shown before, the WorkerThread class uses a TryDequeue method that has been added to the ProcessQueue class. This function attempts to retrieve the next item from the ProcessQueue and assign its value to the out variable that is supplied by the caller. The function returns true if there was an item to obtain, and false if there was not.

C#
public bool TryDequeue(out T value)
{
    lock (syncRoot)
    {
        if (queue.Count > 0)
        {
            value = queue.Dequeue();

            return true;
        }
        else
        {
            value = default(T);

            return false;
        }
    }
}

Taking this approach allows all of the locking to stay within the ProcessQueue, but it prevents the possibility of Thread A examining the Count property and seeing an item, but Thread B dequeuing that item before Thread A can, and Thread A still tries to dequeue it (leading to an exception). With this approach, the first thread to call the function will get the item (with a return value of true) and the second will get a return value of false.

Using the Class

At a minimum, using the class requires that you provide the ProcessQueue with an action delegate that takes a single parameter: namely, an instance of the type that the queue is designed to hold onto. For example, if I needed a queue that would maintain a list of file names and a procedure that would open the file, read the contents, and then do something with that data, I would have something like this:

C#
private void FileHandler(string fileName)
{
    // open the file and do whatever processing is necessary
}

ProcessQueue<string> queue = new ProcessQueue<string>(FileHandler);

This would create a ProcessQueue to hold string objects, with one worker thread that would process any enqueued items through my FileHandler function. This, however, won't do anything until I call Start:

C#
queue.Start();

This unleashes the worker threads to go to work on any items I have in my queue, as well as any that might get added later.

Once processing has been started, it can be stopped (or, more accurately, suspended) by calling Stop on the queue.

C#
queue.Stop();

This will reset the WaitHandle on all threads, causing them to pause execution once the current item completes processing.

The number of threads in the pool can be adjusted at any time by calling SetThreadCount, specifying the desired maximum number of worker threads in the threadCount parameter. While SetThreadCount will remove inactive (unsignaled) threads first, if active threads must be removed, then SetThreadCount will block until the thread completes processing its current item.

Once the lifetime of the queue is over (or, at the very least, when the application or service terminates), Dispose must be called on the queue in order to terminate the worker threads. Dispose will block until each active thread finishes processing its current item.

Note: It is vitally important that Dispose be called on the ProcessQueue before the application terminates, otherwise the worker threads will continue to run indefinitely, even those that are not currently processing an item.

Summary

The .NET ThreadPool provides a convenient way to schedule small "chunks" of code to operate asynchronously, but overuse of the ThreadPool can lead to undesirable results. The ProcessQueue provides a fairly simple way to handle a large number of items, even if an individual item takes a substantial amount of time to process.

History

  • 3rd December, 2009: Initial post

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)