Introduction
In real time systems that need to respond to and process different tasks in
the fastest possible way, and the tasks can be processed concurrently, there
needs to be a number of thread waiting, willing, and able to perform the tasks
at hand. This can be achieved with a specific structural scheme for each
specific job, or with a generic method called thread pool. The work
queue is a simple and elegant type of thread pool that creates
requested number of threads at its creation and manages a queue of different
work items that implement the specific tasks, where each work item
in his turn gets a thread that works and processes it.
How to Use
First a new type of
work item needs to be defined in a
format that can be processed by the
work queue. To do this,
you need to implement a
work item class that inherits from the base
work item class
WorkItemBase
. The
WorkItemBase
has two abstract functions that need to be implemented. The first,
DoWork,
is the method that will be executed by the free thread when
the
work item gets its turn in the queue. The second,
Abort,
is called when the queue is destroyed prior to the
work
item getting its turn. The specific
work item should be implemented
as follows.
class SpecificWorkItem : public WorkItemBase
{
void DoWork(void* pThreadContext);
void Abort();
};
void SpecificWorkItem::DoWork(void* pThreadContext)
{
}
void SpecificWorkItem::Abort()
{
}
Note that a different types of work items can be processed in the same
work queue. In the work queue, first the Create
function needs to be called where the first parameter is the number of threads
and the second is an array of pointers to a thread data structure.
This structures can be used as a working surface for each of the threads. This
is used to reduce the local variables declared in the DoWork
function and the dynamic allocation for each item that is being
processed, for optimization of processing.
CWorkQueue WorkQueue;
WorkQueue.Create(5);
As you can see in the code, one can ignore the second parameter which has the
default value is NULL
. Next, to insert a work item into the
queue you need to call InsertWorkItem
with the newly created
work item. This function can be called asynchronously.
SpecificWorkItem* pWorkItem = new SpecificWorkItem()
m_WorkQueue.InsertWorkItem(pWorkItem);
After using the work queue you must call the Destroy
function. This function will wait until all threads have finished processing the
work items that are already out of the queue, then calls the
Abort
function of each of the work items that are waiting in
the queue to be processed.
m_WorkQueue.Destroy();
Implementation
The work queue is implemented using an STL queue
of
work items which is guaranteed to work in an asynchronous system by a
mutual exclusion lock, a semaphore that is initialized with an initial count of
zero, an abort event, and an array of threads that are created to run a
particular function. This function is waiting on the two asynchronous
objects. First the abort event that, if set, the thread will return
from the function. Second, semaphore, that if released, the thread will
extract the next item in the queue and call its DoWork
function. At
the insert function the semaphore is released (by 1) after inserting the
work item
, allowing one of the threads to do attach to the worker
function. When the Destroy
function is called in turn, the abort
event is set and the function waits for all threads to terminate. For each
of the work item that are left in the queue, it calls their
Abort
function.
Demo project
The demo project is a project I included merely to illustrate the way the
work queue performs. What you need to do to work it is:
- determine the number of thread you want
- click the create button to create the work queue
- insert work item to the queue as you please by clicking the insert
item button
- destroy the work queue click the destroy button.
Enjoy.
thanks to Hans
Dietrich for the XlistCtrl