Introduction
I wrote an article back in June 2004, describing a simple wrapper to the System.Threading.ThreadPool
. This enabled sequential processing of queued items on the pool. I have just recently found time to write the second article based on this; however, researching CodeProject.com, I have found numerous new articles documenting similar behaviors. However, I had already written this, so I am publishing as it may be of benefit to someone.
This solution and library includes a VSTS Test project, and a .NET 2.0 assembly. My class supports the following simple functionality:
- Similar to
System.Threading.ThreadPool
: Works in a very similar way, enabling easy adoption of this code (naming and convention). - Controllable ThreadPool: Configurable number of concurrent threads per instance of the pool.
- Multiple ThreadPools: It is possible to instantiate multiple ThreadPools per application process.
- Prioritization: Prioritization of
WorkItem
s on the work queue. - Removal of duplicates: If the same
WorkItem
is already queued, it can be placed again [using keys]. WaitAll()
: Blocking operators to wait for all WorkItem
s to complete processing.- Custom Windows Performance Counters: Monitors the number of application
ThreadedQueue
s, and the number of concurrent worker threads. - Six Unit tests: These exercise 95% of the code, useful if you wish to change this code for your own use.
Background
I have been working in commodity trading for a number of years, and so many of our applications require parallel processing in order to get the throughput. The original problem was due to the constraints of the ThreadPool
. A process space has only one ThreadPool
, so different parts of your application have contention for this resource. Often the applications had "very high" priority functions and "high" priority functions, both code paths wanting the ability to use all free resources.
My first article was an attempt to create an abstraction class that could offer this functionality; however, as with most things, it got enhanced over the last two years. So, what I am presenting here is a richer class, capable of delivering more control than my first library.
Code Terminology
WorkItem
: An object that encapsulates the method to be executed, WaitCallback
, ThreadPriority
, and State
object.WorkQueue
: An ordered list of WorkItem
s based on their priority.ThreadedQueue
: Owns a WorkQueue
, this manages the processing of the WorkItem
s on the WorkQueue
. The number of concurrent threads is configured here.
Using the Code
I have created four simple examples that demonstrate how to use the class. I have not included all the output from the code being executed, as it is verbose and would bloat the article. So, I have edited the output to leave the appropriate lines to illustrate each example, if required.
The code has lots of comments, and is worth looking and executing if you wish to investigate how it behaves.
Standard ThreadPool Behavior
This is an example of having a large number of tasks that need to be executed in parallel. This is a like for like the same as the System.Threading.ThreadPool
, and my library is simply an abstraction layer for this example.
100 work items (a work item is a sleep for 100ms) are queued on the ThreadedQueue
. At some point in time, all these sleeps will occur.
It is worth noting the similarity to invoking a method on the ThreadedQueue
as it is on the System.Threading.ThreadPool
.
public void Demo_SimpleAsync()
{
ThreadedQueue oThreadedQueue = new ThreadedQueue(5);
const int iNumberOfWorkItems = 100;
for (int iIndex = 0; iIndex < iNumberOfWorkItems; iIndex++)
{
oThreadedQueue.QueueUserWorkItem(new WaitCallback(CallbackMethod), iIndex);
}
}
private void CallbackMethod(object Param)
{
Thread.Sleep(100);
Debug.WriteLine(Param);
}
WaitAll() Behaviour
This is an example is similar to the one above except, we want to know when all the sleeps have occurred. The WaitAll()
method blocks until all of the work items have processed asynchronously on the ThreadPool.
public void Demo_SimpleWaitAll()
{
ThreadedQueue oThreadedQueue = new ThreadedQueue(5);
const int iNumberOfWorkItems = 100;
for (int iIndex = 0; iIndex < iNumberOfWorkItems; iIndex++)
{
oThreadedQueue.QueueUserWorkItem(new WaitCallback(CallbackMethod), iIndex);
}
oThreadedQueue.WaitAll();
}
private void CallbackMethod(object Param)
{
Thread.Sleep(100);
Debug.WriteLine(Param);
}
Prioritisation of WorkItems
This example queues 4 groups of 5 items onto the ThreadedQueue
, 20 in total. Each 5 items are put onto the queue in ascending order of priority, so the last 5 WorkItem
s are the highest priority. The example casts an Int32
to a ThreadPriority
enum.
Points of interest are in the output. The first 5 messages are processed immediately ("1=Severity.Lowest"001), because there are threads waiting for WorkItem
s. However, the 6th message to be processed starts (4"=Highest"001) the ThreadedQueue
, ensuring that the pending WorkItem
s are sorted in priority order.
public void Demo_Priority()
{
ThreadedQueue oThreadedQueue = new ThreadedQueue(5);
const int iNumberOfWorkItems = 5;
for (int iPriority = 0; iPriority < 4; iPriority++)
{
for (int iIndex = 0; iIndex < iNumberOfWorkItems; iIndex++)
{
oThreadedQueue.QueueUserWorkItem(new WaitCallback(CallbackMethod),
iIndex + (iPriority * 1000), (ThreadPriority)iPriority);
}
}
}
private void CallbackMethod(object Param)
{
Thread.Sleep(100);
Debug.WriteLine(Param);
}
Output:
PrioritisedWorkerQueue.Constr: Requested:5 Availible:5
1000//First 5 low priority work items a pushed straight onto threads
1001
1002
1003
1004
4000//The high priority items are processed decending order
4001
4003
4002
4004
3000
3001
3002
3004
3003
2000
2001
2003
2004
2002
Unique WorkItems on Queue
This method sends 4 sets of 5 WorkItem
s to the queue, each set shares the same identifier. If the same identifier is already queued, then the work is discarded. The output of this program is interesting. The first item is queued, and dispatched immediately, leaving an empty queue. The next item from the first iteration is placed on the queue. Then, the third item from the first batch is placed on the queue, but rejected because the second item is still there. The subsequent batches only manage to place one item on the queue due to the processing thread already in use.
I find this example interesting because the results don't initially make sense, why are there two items from the first batch and one from the rest of them? However, it is clear when you consider the WorkItems are dispatched to processing threads. The output becomes random when you add more threads to the ThreadedQueue
; for this example, it was configured as 1.
public void Demo_UniqueWorkItem()
{
ThreadedQueue oPWorkerQueue = new ThreadedQueue(1);
const int iNumberOfWorkItems = 5;
for (int iUniqueKey = 0; iUniqueKey < 4; iUniqueKey++)
{
for (int iIndex = 0; iIndex < iNumberOfWorkItems; iIndex++)
{
oPWorkerQueue.QueueUserWorkItem(new WaitCallback(CallbackMethod),
iUniqueKey + ":" + iIndex, ThreadPriority.Normal, iUniqueKey);
}
}
}
private void CallbackMethod(object Param)
{
Thread.Sleep(100);
Debug.WriteLine(Param);
}
Output:
PrioritisedWorkerQueue.Constr: Requested:1 Availible:1
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'0'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'0'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'0'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'1'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'1'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'1'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'1'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'2'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'2'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'2'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'2'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'3'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'3'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'3'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'3'
0:0 //First item processed immediately
0:1 //Only one further item from the first batch is queued
1:0 //Only one from each subsequent batch is processed.
3:0
2:0
Interesting Bit
This function is the main logic of the ThreadedQueue
class. This is code from the project, with an explanation of the main parts.
1 private void SpawnWorkThread(IAsyncResult AsyncResult)
2 {
3 lock(m_Lock_WorkItem)
4 {
5 if(AsyncResult!=null)
6 {
7 oWaitCallback.EndInvoke(AsyncResult);
8
9 m_ItemsProcessing--;
10
11 #region Removes the ManualReset from the collection,
indicating it is processed.
12 WorkItem oPThreadWorkItem = AsyncResult.AsyncState as WorkItem;
13
14 if (oPThreadWorkItem != null)
15 {
16 m_WorkQueue.ManualResets.Remove(oPThreadWorkItem.MRE);
17 }
18 #endregion
19
20 if (ThreadedQueueMonitor.Instance.ThreadPoolThreadCount != null)
21 ThreadedQueueMonitor.Instance.ThreadPoolThreadCount.Decrement();
22 }
23
24
25 if(m_WorkQueue.Count>0)
26 {
27 m_ItemsProcessing++;
28
29
30 WorkItem oWorkItem = m_WorkQueue.Dequeue();
31
32 if (oWorkItem != null)
33 {
34
35 AsyncCallback oAsyncCallback = new AsyncCallback(this.SpawnWorkThread);
36
37
38 oWaitCallback.BeginInvoke(oWorkItem, oAsyncCallback, oWorkItem);
39
40 if (ThreadedQueueMonitor.Instance.ThreadPoolThreadCount != null)
41 hreadedQueueMonitor.Instance.ThreadPoolThreadCount.Increment();
42 }
43 }
44
45
46 }
47 }
How it is called : Line 1, 35
The SpawnWorkThread
function is called recursively via callbacks. When a fresh thread is spawned by a new WorkerItem
being placed on the ThreadedQueue
, the AsyncResult
parameter is null
. The only time this will not be null
is when a callback invokes the method. The callback is triggered by a WorkItem
processing being completed.
Handling Callbacks : Line 5 - 22
This code block ensures that all references to the callback objects are cleared down. The ManualResetEvent
object must be removed once an object has completed processing, to ensure that it doesn't leak memory (line 16).
Getting the Next WorkItem : Line 30
This object is an instance of WorkItemQueue
, a custom class that maintains a list of sorted and unique items waiting to be processed. The sorting and uniqueness logic is encapsulated within this class, so the SpawnWorkThread
method is very simple, and only requires to call Dequeue()
to get the next item.
Spawning WorkItem on the ThreadPool : Line 25 - 43
If there are some items to be processed, the next item to be processed is removed from the queue (this is not transactional). The next lines of code create a AsyncCallback
class referencing this function SpawnWorkThread
so that it is possible to determine once the WorkItem
has completed processing. Finally, the WaitCallBack
delegate variable is invoked asynchronously, which internally spawns a new thread on the System.Threading.ThreadPool
.
Performance Counters : Line 20, 40
This code interacts with the ThreadedQueueMonitor
which handles the interaction with Windows Performance Counters, incrementing and decrementing the counters as WorkItem
s are processed.
Conclusion
I have used this class and variants of it in many production applications. If you have any comments regarding what I have done, I would be interested in hearing them. Thank you for reading my article. If you felt it was of use to you, please take the time to rank it.
History
- 1.0 - 20 February 2007 - First draft.
- 1.1 - 03 April 2007 - Uploaded to CodeProject.