Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C++

Producer/Consumer Queues in C++

5.00/5 (4 votes)
4 Oct 2020MIT5 min read 27.6K   405  
A simple implementation for producer/consumer mechanism
This article describes a 'plain' C++ implementation of producer/consumers queues and shows the inner workings of this synchronization mechanism.

Background

The problem of producer-consumer processes has been studied from the '70s even before multi-threading became important. That's because these processes play an integral part of any interaction between a computer and the outside world. Even when data was read from punched cards, a program needed to 'consume' a card before the reading process could 'produce' another one. Similarly, a print line needed to be 'consumed' by the printer before the next one could be produced.

While for some time these problems have stayed in the realm of operating systems, once user processes got access to multi-threading facilities, they became a concern for everyday users. For a short background on different implementations, one can check the Wikipedia page.

If you are looking for a C# implementation, you can check Mark Clifton's article on this subject. Here, we are going to stay with the good old C++.

A Producer/Consumer Example

Our example here is going to be taken directly from Mark's article mentioned above. We will write a program that finds all the prime numbers less than a certain limit. We want however to take advantage of all the cores of the CPU in order to 'speed-up' the process. I put speed-up in quotation marks because here we are not interested in real speed: following Mark's example, we will take the same very inefficient function to determine if a number is prime or not:

C++
bool IsPrime (int n)
{
  bool ret = true;
  for (int i = 2; i <= n / 2 && ret; ret = n % i++ != 0)
    ;
  return ret;
}

The general strategy is to have one producer thread adding to a queue the numbers to be checked for "prime-ness" while a number of consumer threads will pick up those numbers and place all positive results in another queue. For added fanciness, the output queue will keep both the prime number and an ID of the consumer thread that computed it.

Here are the data structures used for this setup:

C++
struct result
{
  int prime;
  int worker;
};
...
sync_queue<int> nums;
sync_queue<result> primes;

The nums queue keeps all the numbers to be tested and the primes queue keeps the positive results.

sync_queue is a producer/consumer queue structure that provides orderly access for all threads; we will see it's inner workings in a moment.

Producer Thread

This is the simplest. It just fills the nums queue with all the numbers up to a certain limit. At the end, it places a number of zeroes as a signal for consumers that the job is done. A consumer will terminate when it extracts a 0 from the queue:

C++
thread producer ([&nums]()->int {
  for (int i = 2; i < 500000; i++)
    nums.produce (i);

  for (int i = 0; i < NTHREADS; i++)
    nums.produce (0);
  return 0;
});

Consumer Threads

The code for a consumer thread is not much more complicated. It extract a number from the nums queue, checks if it is a prime using the IsPrime function and, if it is a prime, posts a new result in the primes queue adding its own consumer number to know who calculated it:

C++
auto checker = [&nums, &primes, thnum]()->int {
  int n = 1;
  while (n = nums.consume ())
  {
    if (IsPrime (n))
      primes.produce ({ n,thnum });
  }
  return 0;
};

When the number retrieved is 0, the function returns and the thread terminates.

The main application thread creates a number of consumer threads and starts them all before starting the producer thread:

C++
thread* consumers[NTHREADS];
for (int thnum = 0; thnum < NTHREADS; thnum++)
{
  auto checker = [&nums, &primes, thnum]()->int {
    int n = 1;
    while (n = nums.consume ())
    {
      if (IsPrime (n))
        primes.produce ({ n,thnum });
    }
    return 0;
  };

  consumers[thnum] = new thread (checker);
  consumers[thnum]->start ();
}

Running the Rodeo

Now that everything is setup, we can start the show:

C++
stopwatch t_prod, t_cons;
t_prod.start ();
t_cons.start ();

producer.start ();    //Start producer
producer.wait ();     //Wait to finish producing
t_prod.stop ();
  
//Show producer statistics
cout << "sync_queue finished producing" << " in " << fixed
  << setprecision (2) << t_prod.msecEnd ()/1000. << "sec" << endl;

//Wait for consumers to finish
for (int i = 0; i < NTHREADS; i++)
  consumers[i]->wait ();
t_cons.stop ();
cout << "finished consuming" << " in " << fixed
  << setprecision (2) << t_cons.msecEnd () / 1000. << "sec" << endl;

//Did we find all the primes?
cout << "Expecting 41538 primes, found " 
  << primes.size () <<endl;

//Check who did what
vector<int> found_by(NTHREADS);
while (!primes.empty ())
{
  result r = primes.consume ();
  found_by[r.worker]++;
}

//Show consumers statistics
for (int i = 0; i < NTHREADS; i++)
  cout << "Consumer " << i << " found " << found_by[i]
    << " primes." << end;

On my machine (no speed monster this one), I get something like:

sync_queue finished producing in 1.67sec
finished consuming in 4.34sec
Expecting 41538 primes, found 41538
Consumer 0 found 4869 primes.
Consumer 1 found 5530 primes.
Consumer 2 found 5467 primes.
Consumer 3 found 4844 primes.
Consumer 4 found 4863 primes.
Consumer 5 found 5529 primes.
Consumer 6 found 5596 primes.
Consumer 7 found 4840 primes.

Instead of having some central control distribute the work among consumers, the sync_queue allowed each consumer to pick its work unit and generate results. Some of the threads got a bit more, some a bit less but, all in all, the work was distributed fairly.

The Inner Workings of a Producer/Consumer Queue

sync_queue is a template class derived from std::queue. It provides two main methods: produce and consume. To synchronize access, it uses a semaphore as well as a critical section object to keep everything consistent. The produce method is very simple:

C++
template <class M, class C=std::deque<M>> 
class sync_queue : protected std::queue<M, C>
{
public:
...
  /// Append an element to queue
  virtual void produce (const M& obj)
  {
    lock l (update);        //take control of the queue
    this->push (obj);       //put a copy of the object at the end
    con_sema.signal ();     //and signal the semaphore
  }
...
protected:
  semaphore con_sema;       ///< consumers' semaphore counts down until queue is empty
  criticalsection update;   ///< critical section protects queue's integrity

A lock object acquires the critical section to prevent simultaneous access. The object to be produced is pushed in the queue and the semaphore is signaled. When the function the lock object goes out of scope and the critical section is released.

Consuming is slightly more complicated:

C++
/// Extract and return first element in queue
virtual M consume ()
{
  M result;
  update.enter ();
  while (std::queue<M, C>::empty ())
  {
    update.leave ();
    con_sema.wait ();        //wait for a producer
    update.enter ();
  }
  result = this->front ();  //get the message
  this->pop ();
  update.leave ();
  return result;
}

Again, we enter the critical section and check if the queue is empty. If so, we leave the critical section and start waiting for the consumers' semaphore to be signaled by a producer. When awoken by a signal, again we enter the critical section and loop again.

At this point, two things might have happened:

  • No one else got the object and we find that queue is not empty. In this case, we exit the while loop, pick up the object and leave the critical section.
  • Another hungry consumer got the object and we find the queue empty. In this case, we leave the critical section and wait for another signal at the consumers' semaphore.

In addition to these main methods, there is another method to check if the queue is empty and another one to return the size of the queue. Note that both of them are only indicative because the result might change before the caller has a chance to check it.

Bounded Producer/Consumer Queue

A sharp-eyed reader might have noticed that sync_queue::produce method has no error checking. It blissfully calls std::queue::push and assumes there is enough memory for the new object. This can be seen also from the run times of the producer and consumer threads in the example above: it took producer only 1.7 seconds to fill the queue of numbers to be checked and took consumers 4.4 seconds to empty it.

The bounded_queue class allows you to limit the number of objects that can be queued. If a producer finds the bounded queue full, it has to wait until a consumer removes some of the objects. To do that, we need one more semaphore, pro_sema that is initialized with the maximum size of the queue. The produce method becomes:

C++
template< class M, class C = std::deque<M> >
class bounded_queue : public sync_queue<M, C>
{
public:
  bounded_queue (size_t limit_) : limit (limit_)
  {
    pro_sema.signal ((int)limit);
  }

  /// Append an element to queue. If queue is full, waits until space
  /// becomes available.
  void produce (const M& obj)
  {
    this->update.enter ();
    while (std::queue<M, C>::size () > limit)
    {
      this->update.leave ();
      pro_sema.wait ();
      this->update.enter ();
    }
    this->push (obj);
    this->con_sema.signal ();
    this->update.leave ();
  }
...
protected:
  size_t limit;
  semaphore pro_sema;   ///< producers' semaphore counts down until queue is full

You can see that it is much more similar to the consume method shown before. It enters the critical section and, if the queue is full, repeatedly tries to find space for the new object by waiting on pro_sema.

If we change the prime numbers example to use the bounded_queue structure with 20 entries, the results look like this:

C++
bounded_queue finished producing in 4.32sec
finished consuming in 4.32sec
Expecting 41538 primes, found 41538
Consumer 0 found 5103 primes.
Consumer 1 found 5156 primes.
Consumer 2 found 5192 primes.
Consumer 3 found 5240 primes.
Consumer 4 found 5227 primes.
Consumer 5 found 5091 primes.
Consumer 6 found 5267 primes.
Consumer 7 found 5262 primes.

The time required by the producer thread is the same as the time required by the consumers. That is because the producer is held up by the limited queue size.

Conclusion

The producer/consumer queues presented in this article provide an easy to use mechanism for inter-thread communication. The threading primitives shown here (thread, critical_section, semaphore, etc.) are part of the MLIB project. You can download the complete project from the GitHub project page.

History

  • 4th October, 2020: Initial version

License

This article, along with any associated source code and files, is licensed under The MIT License