Introduction
As usual, this essay was prompted by seeing the same question several times in one week on the newsgroup. I've been using semaphores since 1969, shortly after Edsgar Dijkstra first proposed them in his classic paper [E. W. Dijkstra. Cooperating Sequential Processes. Programming Languages (F. Genuys, ed.), Academic Press, New York, 1968]. I used them a lot in the C.mmp/Hydra system we did at CMU in the mid-1970s, and have used them extensively in Windows. They are actually easy to use.
I have discovered that far too many people use an Event instead of a Semaphore to do synchronization. They are intended to solve different problems, and one does not address the problem domain of the other. You will see in the example below that I use both, for different purposes.
You may also want to read my essay on worker threads.
The solution here is fully general; it can work with multiple producers and multiple consumers. In the example code that accompanies it, I show one producer and two consumers, but the solution will generalize to multiple producers.
What I did was create a Queue
class in C++. Rather than do a specific class or do a template class (template classes have some problems in MFC), I just created one that takes void *
pointers and enqueues them. Thus you should be able to use this code directly in many projects.
You may ask, why am I using the API calls for synchronization and not the MFC synchronization classes like CMutex
, CSemaphore
, and CEvent
. There are two reasons. One is pedagogical: I want you to see the primitives in action. The other is pragmatic. In an earlier release of MFC, the synchronization primitives were done so badly that they were actually wrong, and totally unusable. For example, I was using CEvent::SetEvent
to synchronize two separate processes, but some overly-clever programmer at Microsoft decided it was cool to test a Boolean flag to see if the Event had already been set and not "waste time" calling the kernel if it was set. Stupid programmer; the code to call CEvent::ResetEvent
was in a totally separate process, which is exactly one of the things that you want to do with an Event! And just to add to the incompetence, the programmer actually did an unsynchronized test of a Boolean flag to see if the Event should be set. Code this badly done is beneath contempt. The programmer was truly utterly clueless about the basic operations. Perhaps Microsoft has fixed this code. I've never bothered to look. I don't trust it. Nor do a lot of other people in the MFC newsgroup.
The code and a demo program are all available in a source download.
The following section describes the user interface to the class.
BOOL Queue::AddTail(LPVOID p)
This adds an item to the head of the queue and notifies the waiting thread(s) that the queue is nonempty. Returns TRUE
if it successfully enqueued the request; FALSE
if the semaphore limit was hit.
Note: The test program ignores the Boolean value in terms of recovery and simply discards the item it was enqueuing. You may wish to do otherwise.
LPVOID Queue::RemoveHead()
This removes an item from the queue. If the queue is empty, this call blocks. A call that is blocked on a RemoveTail
operation will cause the thread to terminate if the shutdown
method is called.
void Queue::shutdown()
Causes all threads that are blocked on RemoveTail
to terminate. Note that there is no effort to dequeue any pending items from the queue; it is the responsibility of the caller to not call this method if there is anything in the queue.
Note: I do not stop threads from putting things into the queue during a shutdown. This would involve a Boolean flag that would be set to indicate subsequent AddTail
should return FALSE
. Note that in such a case, the AddTail
should call ::SetLastError
to set a flag that says that a shutdown is in progress, so the caller can use ::GetLastError
to determine the reason FALSE
was returned. There is already an error code, ERROR_SHUTDOWN_IN_PROGRESS
, that could be co-opted for this purpose.
The code for Queue.h contains the entire queue class. There is no .cpp file. Here it is in its entirety:
class Queue {
public:
Queue(UINT limit)
{
handles[SemaphoreIndex] = ::CreateSemaphore(NULL,
0,
limit,
NULL);
handles[StopperIndex] = ::CreateEvent(NULL,
TRUE,
FALSE,
NULL);
::InitializeCriticalSection(&lock);
}
~Queue()
{
::CloseHandle(handles[SemaphoreIndex]);
::CloseHandle(handles[StopperIndex]);
::DeleteCriticalSection(&lock);
}
BOOL AddTail(LPVOID p)
{
BOOL result;
::EnterCriticalSection(&lock);
queue.AddTail(p);
result = ::ReleaseSemaphore(handles[SemaphoreIndex], 1, NULL);
if(!result)
{
queue.RemoveTail();
}
::LeaveCriticalSection(&lock);
return result;
}
LPVOID RemoveHead()
{
LPVOID result;
switch(::WaitForMultipleObjects(2, handles, FALSE, INFINITE))
{
case StopperIndex:
ExitThread(0);
return NULL;
case SemaphoreIndex:
::EnterCriticalSection(&lock);
result = queue.RemoveHead();
::LeaveCriticalSection(&lock);
return result;
case WAIT_TIMEOUT:
default:
ASSERT(FALSE);
return NULL;
}
}
void shutdown()
{
::SetEvent(handles[StopperIndex]);
}
protected:
enum {StopperIndex, SemaphoreIndex};
HANDLE handles[2];
CRITICAL_SECTION lock;
CList<LPVOID, LPVOID> queue;
};
The basic idea is that the CList
called queue is manipulated only from the two methods supplied, AddTail
and RemoveHead
. Since it is a protected
member (very important!) there is no other access to it, and thus all accesses can be protected.
Now let's look at the code in more detail:
Constructor
Queue(UINT limit)
{
handles[SemaphoreIndex] = ::CreateSemaphore(NULL,
0,
limit,
NULL);
handles[StopperIndex] = ::CreateEvent(NULL,
TRUE,
FALSE,
NULL);
::InitializeCriticalSection(&lock);
}
We will need an array of two handles for a later ::WaitForMultipleObjects
, so I just create the handles directly in the array. To give mnemonic names for the two indices, I use an enum
. The choice of the enumeration constants is very important. When ::WaitForMultipleObjects
leaves a wait, it reports the index of the lowest numbered array item that became signaled. Therefore, the order in which you select these indices can determine if preference is given to shutdown or queue element processing. In this example, I have given preference to shutting down, so I chose the index of the Event that is used for shutdown to be 0
, and the index of the semaphore to be 1
. This is encoded in the protected declaration:
enum {StopperIndex, SemaphoreIndex};
Note that an enum
by definition starts assigning at index 0
unless otherwise specified.
While the semaphore gives us protection against trying to execute on an empty queue, and blocks the waiting thread, it does not provide for thread safety in manipulating the queue. In this case, I also have to provide a mutual-exclusion primitive to keep multiple threads from concurrently trying to enqueue or dequeue data. To do this, I create a CRITICAL_SECTION
object, which must be initialized by ::InitializeCriticalSection
.
The Event is a manual-reset event, initially non-signaled. When it comes time to shut the threads down (all the threads), all that is necessary is that I call ::SetEvent
to set the Event to a signaled state. Note that the Event provides no synchronization protection; it provides only notification.
Destructor
Like any good destructor, this one deallocates resources allocated by the constructor:
~Queue()
{
::CloseHandle(handles[SemaphoreIndex]);
::CloseHandle(handles[StopperIndex]);
::DeleteCriticalSection(&lock);
}
AddTail
The only trick in adding an element is that a semaphore always has an upper bound; attempting to add an item above that will always fail. We deal with this by returning FALSE
if the item cannot be enqueued.
Otherwise, we must make sure that no more than one thread at a time is manipulating the queue. We do this by doing an ::EnterCriticalSection
to lock out other threads. Note that you must not do a return
in the scope of a critical section; you must always do a ::LeaveCriticalSection
and a return
would bypass this. This would mean that the next thread that tried to access the critical section would block and nothing would make it possible to release this critical section.
What we do is add the item to the queue and then attempt to release the semaphore. If we are within the limits of the semaphore, the ::ReleaseSemaphore
will return TRUE
. However, if we overflowed the semaphore, we now have one more item in the queue than the semaphore could account for, so we have to roll back the transaction, which we do by doing a RemoveTail
operation on the queue. Thus, when we finally return FALSE
, there has been no net change in the queue.
This technique of doing a return
outside the scope of a synchronization primitive is fundamental. I also find it easier to write code of this nature where the wait/release operations are balanced, that is, there is exactly one of each.
BOOL AddTail(LPVOID p)
{
BOOL result;
::EnterCriticalSection(&lock);
queue.AddTail(p);
result = ::ReleaseSemaphore(handles[SemaphoreIndex], 1, NULL);
if(!result)
{
queue.RemoveTail();
}
::LeaveCriticalSection(&lock);
return result;
}
This function is perhaps overkill; I chose to actually shut down the thread rather than return FALSE
, although you might choose to do otherwise.
What makes this work is the ::WaitForMultipleObjects
, which allows us to block waiting for either a queue item or for a shutdown event. An Event
created in the constructor is used to signal the shutdown. As discussed above, the indices of the items were chosen to favor shutdown over processing, but should you switch the two events, this code still works since it uses the index values directly.
Note that even if I call ::ExitThread
(and don't ever call ::TerminateThread
in a situation like this!), the compiler can often complain that there is no return result. For sanity, I added a return NULL
statement which keeps me from getting warnings from the compiler.
Note the only manipulation of the queue variable is done in the scope of a CRITICAL_SECTION
so that no other thread can be trying to add to the queue, or remove from the queue, concurrently. Again, note how the value must be stored and then returned after the ::LeaveCriticalSection
has released the lock.
I do not support timeout but added the case; it falls through to the default
case and does an ASSERT(FALSE)
to trap anything bad that might happen during development.
LPVOID RemoveHead()
{
LPVOID result;
switch(::WaitForMultipleObjects(2, handles, FALSE, INFINITE))
{
case StopperIndex:
::ExitThread(0);
return NULL;
case SemaphoreIndex:
::EnterCriticalSection(&lock);
result = queue.RemoveHead();
::LeaveCriticalSection(&lock);
return result;
case WAIT_TIMEOUT:
default:
ASSERT(FALSE);
return NULL;
}
}
shutdown
This is actually a trivial routine. All it does is a ::SetEvent
on the "stopper" event, which releases the ::WaitForMultipleObjects
with the result StopperIndex
, which will terminate all the threads that get back to the wait.
This does not handle the case where there might be a long compute cycle you wish to abort. In that case, you should also set a Boolean variable as I have described in my essay on worker threads.
void shutdown()
{
::SetEvent(handles[StopperIndex]);
}
That's it! You now have a thread-safe queue that supports multiple threads.
To test this, I wrote a little test program that you get when you download the source (or you can simply copy-and-paste from this page, but you don't get the test program). To make this more realistic, I added in an artificial delay of 200-700ms, randomly selected, so as you push the button to queue up entries you actually have a chance of getting ahead of the dequeuing.
Here's the thread routine from the test program:
void CQueuetestDlg::run()
{
CString * s = new CString;
s->Format(_T("Thread %08x started"), ::GetCurrentThreadId());
PostMessage(UWM_MESSAGE, (WPARAM)::GetCurrentThreadId(), (LPARAM)s);
while(TRUE)
{
LPVOID p = q.RemoveHead();
long n = InterlockedDecrement(&count);
showCount(n);
PostMessage(UWM_MESSAGE, (WPARAM)::GetCurrentThreadId(), (LPARAM)p);
::Sleep(200 + rand() % 500);
}
}
I post a message to the main GUI thread (this is executing in the context of the main CDialog
, so PostMessage
is posting to that object) telling that the thread has started. The list box into which this is placed is determined by the thread ID, which I obtain by ::GetCurrentThreadId()
.
Each time I add something to the queue, I call InterlockedIncrement
to add one to the count of items in the queue, and on removal I call InterlockedDecement
. This value is displayed by the showCount
routine.
The main loop retrieves an item from the queue and posts it back to the main thread. The items in the queue are all CString *
objects, and will be deleted by the recipient of the PostMessage
, as described in my essay on worker threads.
I then introduce the random ::Sleep
to slow this down to human interaction speeds. If the shutdown
is called, the thread executing this code will terminate (see the RemoveHead
code).
Here's an example of the program running:
Note that the items do not strictly alternate; otherwise we would see all of the even-numbered messages in the thread 1 window and all the odd-numbered messages in the thread 2 window.
What do those phrases mean? Well, I found this table of buzzwords and wrote a little buzzword generator. I carry it around from project to project when I need a way of generating sentences for various input contexts. Note that after you are done with this example, you can use them in your business plan.
Comparison to Other Mechanisms
I have seen a number of really bad implementations of synchronization that do not use semaphores. I will discuss them here and show why they are not sufficient.
Polling
I've seen a number of attempts to work with polling. Sometimes, it is something of the form:
while(counter <= 0) ;
Then someplace else the programmer does:
counter++;
and to remove something, the programmer does:
counter--;
Now, the first thing to understand is that counter++
, where counter
is a variable accessible from multiple threads, does not, cannot, and most likely will not, work correctly in a multithreaded environment. Period. If you think it will work correctly, you're wrong. Added volatile
as a declaration will also not help. All this does is tell the compiler that it must not cache a copy of the value in a register; it doesn't protect the value itself from being incorrectly used. It will particularly fail miserably if you run on a multiprocessor.
Aha, you say. You should have used ::InterlockedIncrement
and ::InterlockedDecrement
. You're right. These would guarantee that the value was incremented and decremented safely, and in fact would work correctly. But we're now left with the polling problem. A polling loop eats up a whole 200ms (more or less, on most NT/2000/XP boxes) timeslice doing absolutely nothing. I have a wonderful example I use in a class where there is a control panel with about 40 controls on it. The program goes into a polling loop. When the control panel pops up while the loop is running, you get to see every... single... control... being... redrawn... one... at... a... time... very... very... slowly... because that polling loop is eating up CPU cycles. In the next lab, we show how to add interrupts and get rid of the polling.
So if you believe that you shouldn't use a Semaphore because it is "inefficient", I can guarantee that no matter what, using a Semaphore will always be more efficient than polling. In fact, the polling usually happens at the worst possible time in your program, the time when it should be working hard creating the next object to be queued up. Yes, you can poll. But do not ever believe that it is "more efficient". It almost certainly is not.
And, I should point out, you still need a CRITICAL_SECTION
or Mutex to protect your data object, so you probably haven't gained all that much by using polling.
CRITICAL_SECTION Blocking
I've seen at least one implementation that did something of the following:
if(count == 0)
::EnterCriticalSection(&wait);
This will not work. For one thing, between the time the test is made (count == 0
) and the critical section is entered, the count could change. Since a CRITICAL_SECTION
is not a counted object, doing a release before a wait will not work, so there is a race condition that will end up blocking forever.
Event Blocking
This has exactly the same problem as CRITICAL_SECTION
blocking. While you could argue that it is possible to set an Event successfully before actually trying to block, because an Event is not counted, you have to do a synchronized update of the count variable before blocking. Because one thread could have decremented the count to zero and done a ::ResetEvent
at about the same time, another thread was incrementing the count and doing a ::SetEvent
, you have no guarantee that the ::SetEvent
will not happen a few hundred nanoseconds before the ::ResetEvent
, thus getting out of order and blocking when the Event should be passable. Work it out. Remember that the processor can be preempted between any two instructions, even while executing in the kernel, if it is in a preemptible context (which it nearly always is: the mantra of Windows developers was "Always preemptible, always interruptible"). On a multiprocessor with true concurrency, this is even more dangerous, but it will usually fail on a uniprocessor as well. It just takes a while--as in, it has been out in the field for months and it happens to your most important customer.
Semaphores Are Not Mutual Exclusion!
I've seen code of the form:
::WaitForSingleObject(semaphore, INFINITE);
queue.RemoveHead(p)
where the argument is that "the semaphore protects the queue manipulation". No, semaphores do not protect the queue unless the maximum semaphore value is 1
. In that case, you have a special case of the semaphore, the binary semaphore, which is also known as a "mutex". Note that a true Mutex
object (as in ::CreateMutex
) actually has slightly different semantics than a semaphore; a thread is said to own a Mutex, and if a thread attempts to reacquire a Mutex
it already owns, it is permitted to pass without blocking. You must execute as many ::ReleaseMutex
operations as you did ::WaitFor
operations on it. Semaphores do not work this way; the second time a thread would attempt to acquire a binary semaphore it already "owned" it would block indefinitely. Also, any thread can release a Semaphore but only the thread that owns a Mutex is permitted to release it. A semaphore merely provides flow control for countable resources (such as the number of items in a queue), and does not protect other objects from concurrent execution. Therefore, the ::RemoveHead
operation above must be protected by either a Mutex or a CRITICAL_SECTION
.
There's No Substitute for a Semaphore
If you think you have invented a clever, faster, more efficient, easier, or whatever way of doing a semaphore without actually using a Semaphore, the chances approach unity that you have simply fooled yourself. Read Dijkstra's earlier papers where he was developing the notion of synchronization primitives that were preemptive-threading safe, and there was no InterlockedIncrement
operation to help him. These are complex papers; the techniques are subtle. Only if you fully understand the issues of synchronization should you even consider trying something like this. The rest of the time, particularly if you are new to parallelism and synchronization, take this as a rule: you haven't a clue as to how to create a semaphore effect without using semaphores. I've been doing this professionally for a quarter century and I don't feel confident trying to fake out a semaphore's functionality with some other mechanism. Trust me in this: You Don't Want To Go There.
The views expressed in these essays are those of the author, and in no way represent, nor are they endorsed by, Microsoft.
Please leave a comment below if you have questions or comments about this article.
Copyright © 1999 All Rights Reserved
www.flounder.com/mvp_tips.htm
License
This article has no explicit license attached to it, but may contain usage terms in the article text or the download files themselves. If in doubt, please contact the author via the discussion board below.
A list of licenses authors might use can be found here.