Introduction
A user on StackOverflow recently asked a question about processing items from a Queue<T>
using multiple threads. While a general solution to this problem is to enqueue these operations using the .NET ThreadPool
mechanism, often this sort of task can scale itself out of being a feasible candidate for using the ThreadPool
fairly easily; too many operations enqueued at once can starve the ThreadPool
, and operations that are particularly long-running can monopolize a ThreadPool
thread. For this reason, I set out to develop a thread-safe (through synchronization) class that operates in the same manner as the built-in Queue<T>
, but allows the user to specify an operation on a per-item basis and would schedule these operations using its own user-adjustable pool of threads. Enter: the ProcessQueue<T>
.
As with the FactoryDictionary
, a healthy portion of the code contained in the ProcessQueue
is basically a synchronized pass-thru to the underlying Queue<T>
class, so I won't cover that here. I'll be focusing on the management of the threads that the ProcessQueue
creates and how they relate to the user's action.
Pooling the Threads
By default, the ProcessQueue
starts out with one worker thread. This is suitable for asynchronous processing of the items in the ProcessQueue
when the time to complete is not as important as conserving resources, or when it’s essential that items in the ProcessQueue
be processed in the same order as they were entered.
Note: I want to point out an important caveat that may have slipped in with that last sentence: while the ProcessQueue
itself is a queue (and using it as such with the Enqueue
and Dequeue
functions will behave as expected), there is no guarantee that when item X is added before item Y that item X will complete processing before item Y when multiple worker threads are in use. If order of completion must reflect the order of the queue, then only a single worker thread must be used.
Because the ProcessQueue
needs to be able to suspend, resume, and terminate its worker threads, the WorkerThread
class wraps the management of the WaitHandle
objects that coordinate this functionality. The WorkerThread
maintains a ManualResetEvent
that signals whether or not it should actively check the ProcessQueue
for an item, and another ManualResetEvent
that signals the thread to abort. The relevant code appears below:
private ManualResetEvent abortEvent;
private ManualResetEvent signalEvent;
private Thread thread;
public void Start()
{
thread.Start();
}
public void Abort()
{
abortEvent.Set();
thread.Join();
}
public void Pause()
{
signalEvent.Reset();
}
public void Signal()
{
signalEvent.Set();
}
The ProcessQueue
class uses the functions listed here to manage the threads:
Start
is called when the WorkerThread
is created Signal
is called when the thread needs to "wake up", such as when items are added to the queue or the number of threads is increased and the thread needs to start processing immediately. Pause
is called only if the user manually signals the ProcessQueue
to pause all processing (the function is called on all WorkerThread
objects at that time). Abort
is called when the ProcessQueue
is disposed or when the number of worker threads is reduced by calling SetThreadCount
.
The body of the thread's procedure appears below:
private void ThreadProc()
{
WaitHandle[] handles = new WaitHandle[] { signalEvent, abortEvent };
while (true)
{
switch (WaitHandle.WaitAny(handles))
{
case 0:
{
ProcessItems();
} break;
case 1:
{
return;
}
}
}
}
This it fairly typical infinite loop thread body which waits on either of the two WaitHandles
to be set before it proceeds. If the signal handle was set, then it processes the current items in the queue then resets the signal handle and waits for it to be set again. If the abort handle is set, it returns from the thread body and the thread terminates.
private void ProcessItems()
{
T item;
while (queue.TryDequeue(out item))
{
queue.ProcessItem(item);
if (!signalEvent.WaitOne(0) || abortEvent.WaitOne(0)) return;
}
signalEvent.Reset();
}
This introduces something from the ProcessQueue
class: the TryDequeue
function. If you're familiar with the various TryX
functions in the .NET Framework (TryParse
on various value types, TryGetValue
, etc.) then its purpose should be fairly clear, but I will cover it in the upcoming section.
One advantage to using the WorkerThread
class is that the thread can put itself to sleep once processing is finished, rather than requiring the ProcessQueue
to monitor calls to Dequeue
to determine when to suspend a thread. In fact, the ProcessQueue
class simply maintains a List<WorkerThread>
internally and activates a thread when needed.
Process Management
From the end-user's perspective, Start
, Stop
, and SetThreadCount
are the only calls necessary to control the processing of items in the ProcessQueue
.
Start
triggers the processing of all items currently in the queue and any that are added afterwards. Stop
pauses the processing of any unprocessed items. Calling Pause
will not suspend any active processes, but it will prevent any completely unprocessed items (or items added after the call) from being processed. SetThreadCount
does exactly what it sounds like; it sets the number of threads that the ProcessQueue
maintains for performing concurrent processes. Note that at least one thread must be present in order for any work to happen, so values that are less than 1 will throw an ArgumentOutOfRangeException
.
From the ProcessQueue
's perspective, all it ever does is activate individual threads or pause all threads. It never has a need to pause an individual thread. It will activate threads when:
- A new item is added to the queue (and there are inactive threads)
- Processing is resumed after being paused (or started for the first time) and there are unprocessed items
SetThreadCount
was called to increase the size of the thread pool and there are unprocessed items
public void Enqueue(T item)
{
lock (syncRoot)
{
queue.Enqueue(item);
if (isRunning)
{
RegenerateIfDisposed();
WorkerThread firstThread = threads.Where(t => !t.IsSignaled).FirstOrDefault();
if (firstThread != null) firstThread.Signal();
}
}
}
public void Start()
{
lock (syncRoot)
{
RegenerateIfDisposed();
for (int i = 0; i < Math.Min(threads.Count, queue.Count); i++)
{
threads[i].Signal();
}
isRunning = true;
}
}
public void SetThreadCount(int threadCount)
{
if (threadCount < 1)
throw new ArgumentOutOfRangeException("threadCount",
"The ProcessQueue class requires at least one worker thread.");
lock (syncRoot)
{
int pending = queue.Count;
for (int i = threads.Count; i < threadCount; i++)
{
WorkerThread thread = new ProcessQueue<t>.WorkerThread(this);
threads.Add(thread);
thread.Start();
if (pending> 1)
{
thread.Signal();
}
pending--;
}
int toRemove = threads.Count - threadCount;
if (toRemove > 0)
{
foreach (WorkerThread thread in threads.Where(t => !t.IsSignaled).ToList())
{
thread.Abort();
threads.Remove(thread);
toRemove--;
}
while (toRemove > 0)
{
WorkerThread thread = threads[threads.Count - 1];
thread.Abort();
threads.Remove(thread);
toRemove--;
}
}
}
}
Note here that the SetThreadCount
function will add or remove threads as necessary, and if any unprocessed items exist while threads are being added, then the appropriate number of threads will be signaled. If the number of threads is being decreased, then unsignaled threads are removed first, then signaled threads are removed until the desired number of threads are still active.
As shown before, the WorkerThread
class uses a TryDequeue
method that has been added to the ProcessQueue
class. This function attempts to retrieve the next item from the ProcessQueue
and assign its value to the out
variable that is supplied by the caller. The function returns true
if there was an item to obtain, and false
if there was not.
public bool TryDequeue(out T value)
{
lock (syncRoot)
{
if (queue.Count > 0)
{
value = queue.Dequeue();
return true;
}
else
{
value = default(T);
return false;
}
}
}
Taking this approach allows all of the locking to stay within the ProcessQueue
, but it prevents the possibility of Thread A examining the Count
property and seeing an item, but Thread B dequeuing that item before Thread A can, and Thread A still tries to dequeue it (leading to an exception). With this approach, the first thread to call the function will get the item (with a return value of true
) and the second will get a return value of false
.
Using the Class
At a minimum, using the class requires that you provide the ProcessQueue
with an action delegate that takes a single parameter: namely, an instance of the type that the queue is designed to hold onto. For example, if I needed a queue that would maintain a list of file names and a procedure that would open the file, read the contents, and then do something with that data, I would have something like this:
private void FileHandler(string fileName)
{
}
ProcessQueue<string> queue = new ProcessQueue<string>(FileHandler);
This would create a ProcessQueue
to hold string objects, with one worker thread that would process any enqueued items through my FileHandler
function. This, however, won't do anything until I call Start
:
queue.Start();
This unleashes the worker threads to go to work on any items I have in my queue, as well as any that might get added later.
Once processing has been started, it can be stopped (or, more accurately, suspended) by calling Stop
on the queue.
queue.Stop();
This will reset the WaitHandle
on all threads, causing them to pause execution once the current item completes processing.
The number of threads in the pool can be adjusted at any time by calling SetThreadCount
, specifying the desired maximum number of worker threads in the threadCount
parameter. While SetThreadCount
will remove inactive (unsignaled) threads first, if active threads must be removed, then SetThreadCount
will block until the thread completes processing its current item.
Once the lifetime of the queue is over (or, at the very least, when the application or service terminates), Dispose
must be called on the queue in order to terminate the worker threads. Dispose
will block until each active thread finishes processing its current item.
Note: It is vitally important that Dispose
be called on the ProcessQueue
before the application terminates, otherwise the worker threads will continue to run indefinitely, even those that are not currently processing an item.
Summary
The .NET ThreadPool
provides a convenient way to schedule small "chunks" of code to operate asynchronously, but overuse of the ThreadPool
can lead to undesirable results. The ProcessQueue
provides a fairly simple way to handle a large number of items, even if an individual item takes a substantial amount of time to process.
History
- 3rd December, 2009: Initial post