Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Producer/Consumer Implementation Using Thread, Semaphore and Event

0.00/5 (No votes)
6 Jul 2004 1  
Using thread, semaphore and event classes to implement a specific version of Producer/Consumer model

0. Introduction

Producer/Consumer is a well-known model in the area of computer sciences. To actually implement this model involves some important as well as fundamental techniques such as multithreading and synchronization. This article presents 3 generic classes, namely, mySemaphore, myThread and myEvent, and then uses these classes to implement one version of the producer/consumer model.

Compared to other articles on the same topic, these classes are not built from or based on any classes from MFC or C#, therefore it is quite light-weighted; also they are generic enough to be used when building other applications – the producer/consumer implementation can be viewed as one example of these classes in use. Another point worth mentioning is that whenever multithreading is involved, it always seems to be a problem about how to terminate the thread safely. This article also presents a method to terminate the thread: among the 3 classes, myThread is responsible for creating and running the thread, mySemaphore is in charge of synchronization, and myEvent is used to provide a mechanism of informing the thread to terminate: the main thread has the control on some event that the thread also has access to, when it is the time to terminate the thread, the main thread will signal the event and the thread will then capture the signal so it understands that it is time to terminate, but before the termination, the thread will finish all the necessary memory clean up and other house keeping job it needs to do.

The next section will describe the particular version of the Producer/Consumer model that is to be implemented. Then we discuss how to build/compile the project if you want to try it out yourself or if you want to use these 3 classes in your other projects. The next several sections present these 3 classes in detail and describe how to implement the Producer/Consumer model; this also serves as an example of using the 3 generic classes in real applications.

1. Producer/Consumer Scenario

The Producer/Consumer model we are interested in is described as follows. In this case, we have one producer and several consumers (let us, say, 5 consumers). The producer will produce a message and this message is saved in a file. The 5 consumers will all take a look at the file and if there is some message in the file, they will read the message. Notice that the head of the message specifies to which consumer it is for, therefore, only the correct consumer can actually “consume” the message: it will get the message and set the file to be empty, indicating that the message is consumed and gone. For any of the other consumers, it will either find that the message is not for itself (so it will not change the message file) or the message file is empty. The file is empty from the beginning; so all the consumers just wait until there is some message in the file.

The producer will keep checking the file: if it is empty, it is either the beginning of everything or the last message has been consumed, in both cases the producer will produce a new message in the file. If the file is not empty, the producer will do nothing but to wait for the message to be consumed by the right consumer.

The producer can produce a message like this: “consumerName QUIT”, indicating that the consumer with the consumerName should be terminated. Once a consumer gets a message reads “QUIT”, it will terminate itself safely. Once all the consumers are terminated, the consumer main loop will terminate too.

2. How To Run the Project

You can download the source code, the following .cpp and .h files should be included in the zip file:

For the consumer:

multiConsumer.cpp
myEvent.cpp
myException.cpp
myLog.cpp
mySemaphore.cpp
myThread.cpp
myEvent.h
myException.h
myLog.h
mySemaphore.h
myThread.h

For the producer:

myException.cpp
myLog.cpp
mySemaphore.cpp
producer.cpp
myException.h
myLog.h
mySemaphore.h

After having all the source code (the only change you need to make to the code is to change the path of message.txt, this file is in both the producer and consumer code, so you need to change two places), you can then build two projects: one for the producer and one for the consumers. After compiling, you should start the producer first and then start the consumer, start to type in the messages and observe the reaction from the consumers. Since this is only a small toy project, not too much protection/flexibility is provided, for instance, you need to type the message in some format (see Section 7), also you can type "BYE" to stop the producer.

In the next few sections, we will discuss the classes in details and show how to use these class to implement the Producer/Consumer model.

3. myEvent class

In this section, myEvent class is described. The following questions are to be answered:

  1. One can find CEvent class in MFC, so why do we have to reinvent the wheel?
  2. What are the basic functionalities that are provided in this class?
  3. Why do we need this class in the Producer/Consumer implementation?

Let us start by addressing the first question. Basically, myEvent class provides a mechanism for one process to signal another process that a specific event has happened. It simply encapsulates WIN32’s event related APIs. In MFC, one can find CEvent class, however, these two classes are not the same: myEvent class offers the ability to call a user-defined callback function when the event has occured. This turns out to be a fairly interesting feature which provides at least two potential benefits. To see this, let us take a look at its constructor:

myEvent(string eventName=NULL,PAPCFUNC userAPC=NULL,BOOL queueUserAPC=FALSE)

userAPC is a pointer to a callback function that is provided by the user in the form of asynchronous procedure call (APC). If this pointer is not NULL and queueUserAPC is TRUE, besides creating the event with the given eventName, myEvent’s constructor will start a thread, its job is to wait for the event (or other conditions such as an APC is queued) to occur. When the event does occur, the thread will queue the callback to execute.

With this being said, it is now easy to see this feature's first benefit: this provides a smarter way to wait. Imagine some main program has to wait for some event to happen (once it happens, the main program will perform some specific handling), an easy solution is to make this main program wait on the event, which will block its processing. However, we might want the main program to perform other processing while waiting on the event. A better solution is provided in the myEvent class: the specific handling when the event occurs is written into a user-defined callback and this callback is given to the event constructor. The main program will create the event, when the event is created, its constructor will (without you knowing it) create a thread whose sole task is to wait on the event, and when the event does occur, this thread will queue the callback function to be executed. The main program, without being aware of the existence of this waiting thread, can continue whatever processing it wants to.

Another benefit of this class is to provide a safe way to terminate a thread: a thread can create an object of such an event, when it is time to terminate this thread, the main control in the main thread can signal the event, and the callback can go ahead to do the memory/resource clean up for this thread.

It is certainly true that myEvent class is generic enough to be used in other applications developed on Windows platform, besides all the potential benefits mentioned above, it also provides an easy way to use all the WIN32 event handling APIs. Its methods including setEvent(), resetEvent(), waitForEvent(DWORD waitTime=INFINITE), pulseEvent(), etc. To see all the details, check the class definition and implementation that you can download here.

Why do we need this class in this Producer/Consumer implementation? Again, we need it here since it is used as a safe way to terminate the consumer threads. In this particular application, the clean up work is not complex so we do not need to write a callback, but the idea is the same and it is shown in the coming sections.

4. mySemaphore class

In this section, a brief description of mySemaphore class is presented. Again, the same questions as in Section 3 will be answered. To start, it is clear that this class encapsulates the WIN32 semaphore APIs. Again, one can find a CSemaphore class in MFC that provides wrappers to some of these APIs as well, however, believe it or not, CSemaphore lacks a method to wait for the semaphore to be released, which, perhaps, is considered to be one of the key functions of this class.

The basic function set provided by this class is quite intuitive: you can create a semaphore object by actually creating a new semaphore or opening an existing one. After having the semaphore object, you can lock it (either wait on it until you can successfully lock it or simply try and return if you cannot lock it at the moment), unlock it, change its settings (initial count, maximum count, etc.), etc. See the class definition/implementation in the download codes. This class is also generic enough to be used in other applications on Windows platform.

mySemaphore class is used in the Producer/Consumer model to realize synchronization control: the producer will not produce anything if one of the consumer is reading the message file, by the same token, the consumer will not be able to access the message file if the producer is in the middle of updating the message file, also at any given moment, the message file can only be read and/or updated by one consumer. Even the log file (for debug purpose) has to be synchronized by using semaphore.

5. myThread class

To implement a Producer/Consumer model with multiple consumers, perhaps multithreading is the best solution - it can be used to "simulate" the existence of multiple consumers, since thread enables a process to do more one thing at a time. Let us again mention one of the key benefits of thread programming (it is mentioned in Section 3 already): by creating a thread, you can call the blocking function in this thread and let the main thread continue its processing without waiting.

Like the other two classes, myThread is a generic class that can be easily used in the development of other applications. Its main functionalities include the following: create a thread, start to execute a thread, suspend/resume a thread, wait for a thread to finish and get access of its exit code, access the settings of a thread (for instance, the priority of a thread), and report time statistics of a thread, etc. Again, for details, see the definition/implementation of the myThread class.

Let us now concentrate on how to terminate a thread safely. To start, let us study the constructor of myThread first,

myThread(LPTHREAD_START_ROUTINE threadFunction,LPVOID pThreadFuncParameter=NULL,
         DWORD exeFlags=0,DWORD sSize=0,BOOL inheritable=FALSE)

In this constructor, threadFunction is the address of the callback function, pThreadFuncParameter is the address of the callback’s parameter. If more than one parameter is needed in the callback, a good solution is to build a class that encapsulates all the parameters to the callback and pass a pointer to an object of this class. The callback function, after receiving this pointer, will be able to access all the parameters.

This leads us to build another class, and let us call this class myThreadArgument. It is certainly impossible to figure out all the data members this class should have to fit all the needs of all the potential threads that might be created in the real world. However, one parameter, or rather one data member, is always needed: a pointer to an object of myEvent class.

As mentioned above, whenever thread programming is involved, we will always need a safe way to terminate a thread (WIN32 does provide TerminateThread(), but this call does not allow the thread to clean up some of its resources before exiting, so its usage is discouraged). In this design, myEvent class is used to terminate a thread: when it is time to terminate a thread, instead of calling TerminateThread(), the main loop will access the thread’s argument object, signal the event in this object. On the other hand, the thread will also check the event in its argument object, once it sees the event is signaled, it will clear up everything and exit. This basic flow is shown below:

myThreadArgument’s constructor will build an event object:

myThreadArgument::myThreadArgument(...)    
{
   // add data members you want here

   // this event is always necessary
   exitEvent = new myEvent("exitEvent");
}

in the main loop:

void main (int argc, char* argv[])
{
   myThreadArgument* threadArgument = new myThreadArgument();
   myThread* thread = new myThread(threadFunc,(void*)threadArgument);
   thread->execute();

   while (1)
   {

      // do stuff here ...

      // time to stop the thread: signal the thread to stop
      threadArgument->getExitEvent()->setEvent();
      Sleep(1);                                                       
   }
   return 0;
}

in the thread function:

DWORD WINAPI threadFunc(LPVOID threadInfo)
{
   // get the parameters to this calleback
   myThreadArgument* threadArgument = (myThreadArgument*)threadInfo;

   // do other stuff ...

   while (1)
   {

      // do stuff here ...

      // check to see if this thread should terminate
      if (threadArgument->getExitEvent()->waitForEvent(0))
      {
          // clean up everything here !!! then get out
          break; 
      } 
   }
    
    // do stuff here ...

}

6. Two Helper Classes: myLog and myException

Before we start implementing the Producer/Consumer model, we need two more classes: myLog and myException.

To debug a system with multithread could be difficult. In order to understand the actions taken by each thread (including the main thread), we want all the threads to write their major actions to a log file and myLog class is designed for this reason. This is mainly for the purpose of understanding what is going on in the system, if you don’t need the log, you can search for winLog in all the .cpp files and comment them out. Also, remember to take the following line out from producer.cpp:

myLog winLog("producer.txt");

and take out the following line from multiConsumer.cpp:

myLog winLog("consumer.txt");

In order to capture the possible errors, a simple myException class is provided. We recommend that you keep this class, it is quite simple and easy to understand anyway.

7. Implementation of the Producer/Consumer Model

Now that we finished all the necessary classes, we can start to implement the Producer/Consumer model. Let us first take a look at the producer. Again, in our model, we have only one producer and it is quite straightforward to build, the only thing is that in order to protect the access to the message file, the producer builds a semaphore as follows:

mySemaphore producerSemaphore(string("producerSem"));

In the main loop, the producer tries to lock this semaphore so it can start to "produce" messages into the message file. Once the producer successfully locks this semaphore, it will check the message file (the name is message.txt), if the file is empty, it will produce a new message, otherwise, it will unlock the semaphore and wait for the consumers to consume the message - this is the main flow in the producer.cpp, you can find all the details in the download source code.

The main consumer logic is not difficult either. It will first build two semaphores: one is for the access to the log file, the other is called mainSemaphore:

// semaphore that protects the log file
mySemaphore logSemaphore(string(""),1);
// main semaphore (see the main loop below)
mySemaphore mainSemaphore(string("main"),0,10);

It will then create all the 5 consumer threads and start all these threads before it goes into the main loop:

// create all the consumers and their arguments
for ( int i = 0; i < numOfConsumers; i++ )
{
   // first create the consumer's name: their name is "0","1","2","3" and "4"
   char tmp[64];
   memset(tmp,0,sizeof(tmp));
   sprintf(tmp,"%d",i);
   string consumerName = string(tmp); 

   // build the argument for each consumer thread
   consumerArgument[i] = 
    new myThreadArgument(consumerName,&logSemaphore,&mainSemaphore,
    string("producerSem"),string("message.txt"));

   // build each consumer thread using the above argument
   consumer[i] = new myThread(consumerThread,(void*)consumerArgument[i]);

   // start it!
   consumer[i]->execute();
}

After all the above setup work, the main consumer loop starts: it will first try to lock the mainSemaphore. Successfully locking this semaphore means that one consumer has consumed the message already, therefore it is the main loop’s turn to do the following: if this message is “QUIT”, it will signal the consumer who receives this message to terminate, if all the consumers are terminated, the main loop will terminate.

Each consumer thread has the following function signature:

DWORD WINAPI consumerThread(LPVOID threadInfo))

This represents a single consumer and it is implemented as follows. The first thing is to get the necessary information from the thread argument object threadInfo: the log semaphore so each consumer can have synchronized access to the log file, this consumer's name (the consumer’s name is a character from the set {“0”, “1”, 2”, “3”,“4”}), the mainSemaphore so to inform the main loop that the message is consumed, it will also get the producer semaphore’s name so to get access to the message.txt file. Then, each consumer will now get into its main loop: in this main loop, it will try to lock the producer semaphore first. Once it locks the producer semaphore, it will read the message file. If a message is in the file and this message is for this consumer (the first part of the message has to be a consumer's name, i.e., a character from the set {“0”, “1”, 2”, “3”,“4”}), it will set the file length to 0, indicating the message is consumed, it will also print this message into the log file. If there is message but it is not for this consumer, it will not change the file but add a line into the log showing that this message is for another consumer. If the file is empty, it will print into the log file showing that another consumer has already consumed the message. It then unlock the producer semaphore to let other consumers have a chance to read it, or, let the producer to have a chance to take it back. The last thing this consumer thread will do is to check its argument to see if it should terminate:

// notice there is no memory/resource clean-up work needs to be done,
// so simply break the main loop in the consumer thread
if (threadArgument->getExitEvent()->waitForEvent(0)) break;

If the exitEvent in the threadArgument is already signaled, it will then break out of the loop so the consumer thread is terminated. Otherwise, it will continue the loop, i.e., trying to lock the producer semaphore and read the message file as described in the above.

Notice that all the messages that are produced by the producer have to be in the following format:

consumerName message

such as:

0 this message is for the first consumer

In this message, the first 0 is the consumer name, the rest of it is the message itself.

The following is a log file showing all the messages that the producer has produced in one run (notice that once you finish running the producer and consumer, there will be log files in your local directory in which you have run the executables, the names are producer.txt and consumer.txt):

DATE: 07/02/04 - 03:00:08                  producer.txt

0 this message is for consumer 0
1 this message is for consumer 1
0 for consumer 0 again
2 this message is for consumer 2
3 this message is for consumer 3
4 this message is for consumer 4
1 for consumer 1 again
0 QUIT
1 QUIT
2 this message is for consumer 2 again
2 QUIT
4 you are one of the two last consumers here!
3 QUIT
4 QUIT
BYE

DATE: 07/02/04 - 03:02:31                  producer.txt

Execution time: 2 minutes 23 seconds

and the following log file shows the trace of the consumers under the above producer messages:

DATE: 07/02/04 - 03:00:12                  consumer.txt

[consumer 0 ] has read the msg:  got message:  this message is for consumer 0
[consumer 1 ] has read the msg: it is already consumed by other consumer.
[consumer 2 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 0 ] has read the msg:  it is for consumer 1
[consumer 2 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 1 ] has read the msg:  got message:  this message is for consumer 1
[consumer 2 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 1 ] has read the msg: it is already consumed by other consumer.
[consumer 0 ] has read the msg:  got message:  for consumer 0 again
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 1 ] has read the msg: it is already consumed by other consumer.
[consumer 0 ] has read the msg: it is already consumed by other consumer.
[consumer 2 ] has read the msg:  got message:  this message is for consumer 2
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 1 ] has read the msg: it is already consumed by other consumer.
[consumer 2 ] has read the msg: it is already consumed by other consumer.
[consumer 0 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg:  got message:  this message is for consumer 3
[consumer 1 ] has read the msg: it is already consumed by other consumer.
[consumer 2 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 0 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg:  got message:  this message is for consumer 4
[consumer 2 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 0 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 1 ] has read the msg:  got message:  for consumer 1 again
[consumer 3 ] has read the msg:  it is for consumer 0
[main] will signal [0] to exit...
[consumer 4 ] has read the msg:  it is for consumer 0
[consumer 2 ] has read the msg:  it is for consumer 0
[consumer 0 ] has read the msg:  got message:  QUIT
[consumer 1 ] has read the msg:  it is for consumer 0
[consumer 0 ] will terminate now
[consumer 3 ] has read the msg:  it is for consumer 1
[consumer 4 ] has read the msg:  it is for consumer 1
[main] will signal [1] to exit...
[consumer 2 ] has read the msg:  it is for consumer 1
[consumer 1 ] has read the msg:  got message:  QUIT
[consumer 1 ] will terminate now
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 2 ] has read the msg:  
  got message:  this message is for consumer 2 again
[consumer 3 ] has read the msg:  it is for consumer 2
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[main] will signal [2] to exit...
[consumer 2 ] has read the msg:  got message:  QUIT
[consumer 2 ] will terminate now
[consumer 3 ] has read the msg:  it is for consumer 4
[consumer 4 ] has read the msg:  
  got message:  you are one of the two last consumers here!
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg:  got message:  QUIT
[main] will signal [3] to exit...
[consumer 3 ] will terminate now
[consumer 4 ] has read the msg:  got message:  QUIT
[main] will signal [4] to exit...
all the consumers are dead, the main thread will be terminated!
[consumer 4 ] will terminate now

DATE: 07/02/04 - 03:02:28                  consumer.txt

Execution time: 2 minutes 16 seconds

8. Conclusion

This article presents 3 generic classes and also uses these classes to implement one version of the Producer/Consumer model. Again, these classes are generic enough to be used in other applications that need multithreading and synchronization controls. You can also play with this specific example to study and understand the behavior of thread, semaphore and event. Hope this will be of some help to your development work and I certainly welcome any suggestions and comments.

License

This article has no explicit license attached to it, but may contain usage terms in the article text or the download files themselves. If in doubt, please contact the author via the discussion board below.

A list of licenses authors might use can be found here.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here