This article describes a 'plain' C++ implementation of producer/consumers queues and shows the inner workings of this synchronization mechanism.
Background
The problem of producer-consumer processes has been studied from the '70s even before multi-threading became important. That's because these processes play an integral part of any interaction between a computer and the outside world. Even when data was read from punched cards, a program needed to 'consume' a card before the reading process could 'produce' another one. Similarly, a print line needed to be 'consumed' by the printer before the next one could be produced.
While for some time these problems have stayed in the realm of operating systems, once user processes got access to multi-threading facilities, they became a concern for everyday users. For a short background on different implementations, one can check the Wikipedia page.
If you are looking for a C# implementation, you can check Mark Clifton's article on this subject. Here, we are going to stay with the good old C++.
A Producer/Consumer Example
Our example here is going to be taken directly from Mark's article mentioned above. We will write a program that finds all the prime numbers less than a certain limit. We want however to take advantage of all the cores of the CPU in order to 'speed-up' the process. I put speed-up in quotation marks because here we are not interested in real speed: following Mark's example, we will take the same very inefficient function to determine if a number is prime or not:
bool IsPrime (int n)
{
bool ret = true;
for (int i = 2; i <= n / 2 && ret; ret = n % i++ != 0)
;
return ret;
}
The general strategy is to have one producer thread adding to a queue the numbers to be checked for "prime-ness" while a number of consumer threads will pick up those numbers and place all positive results in another queue. For added fanciness, the output queue will keep both the prime number and an ID of the consumer thread that computed it.
Here are the data structures used for this setup:
struct result
{
int prime;
int worker;
};
...
sync_queue<int> nums;
sync_queue<result> primes;
The nums
queue keeps all the numbers to be tested and the primes
queue keeps the positive results.
sync_queue
is a producer/consumer queue structure that provides orderly access for all threads; we will see it's inner workings in a moment.
Producer Thread
This is the simplest. It just fills the nums
queue with all the numbers up to a certain limit. At the end, it places a number of zeroes as a signal for consumers that the job is done. A consumer will terminate when it extracts a 0
from the queue:
thread producer ([&nums]()->int {
for (int i = 2; i < 500000; i++)
nums.produce (i);
for (int i = 0; i < NTHREADS; i++)
nums.produce (0);
return 0;
});
Consumer Threads
The code for a consumer thread is not much more complicated. It extract a number from the nums
queue, checks if it is a prime using the IsPrime
function and, if it is a prime, posts a new result in the primes
queue adding its own consumer number to know who calculated it:
auto checker = [&nums, &primes, thnum]()->int {
int n = 1;
while (n = nums.consume ())
{
if (IsPrime (n))
primes.produce ({ n,thnum });
}
return 0;
};
When the number retrieved is 0
, the function returns and the thread terminates.
The main application thread creates a number of consumer threads and starts them all before starting the producer thread:
thread* consumers[NTHREADS];
for (int thnum = 0; thnum < NTHREADS; thnum++)
{
auto checker = [&nums, &primes, thnum]()->int {
int n = 1;
while (n = nums.consume ())
{
if (IsPrime (n))
primes.produce ({ n,thnum });
}
return 0;
};
consumers[thnum] = new thread (checker);
consumers[thnum]->start ();
}
Running the Rodeo
Now that everything is setup, we can start the show:
stopwatch t_prod, t_cons;
t_prod.start ();
t_cons.start ();
producer.start (); producer.wait (); t_prod.stop ();
cout << "sync_queue finished producing" << " in " << fixed
<< setprecision (2) << t_prod.msecEnd ()/1000. << "sec" << endl;
for (int i = 0; i < NTHREADS; i++)
consumers[i]->wait ();
t_cons.stop ();
cout << "finished consuming" << " in " << fixed
<< setprecision (2) << t_cons.msecEnd () / 1000. << "sec" << endl;
cout << "Expecting 41538 primes, found "
<< primes.size () <<endl;
vector<int> found_by(NTHREADS);
while (!primes.empty ())
{
result r = primes.consume ();
found_by[r.worker]++;
}
for (int i = 0; i < NTHREADS; i++)
cout << "Consumer " << i << " found " << found_by[i]
<< " primes." << end;
On my machine (no speed monster this one), I get something like:
sync_queue finished producing in 1.67sec
finished consuming in 4.34sec
Expecting 41538 primes, found 41538
Consumer 0 found 4869 primes.
Consumer 1 found 5530 primes.
Consumer 2 found 5467 primes.
Consumer 3 found 4844 primes.
Consumer 4 found 4863 primes.
Consumer 5 found 5529 primes.
Consumer 6 found 5596 primes.
Consumer 7 found 4840 primes.
Instead of having some central control distribute the work among consumers, the sync_queue
allowed each consumer to pick its work unit and generate results. Some of the threads got a bit more, some a bit less but, all in all, the work was distributed fairly.
The Inner Workings of a Producer/Consumer Queue
sync_queue
is a template class derived from std::queue
. It provides two main methods: produce
and consume
. To synchronize access, it uses a semaphore as well as a critical section object to keep everything consistent. The produce
method is very simple:
template <class M, class C=std::deque<M>>
class sync_queue : protected std::queue<M, C>
{
public:
...
virtual void produce (const M& obj)
{
lock l (update); this->push (obj); con_sema.signal (); }
...
protected:
semaphore con_sema;
A lock
object acquires the critical section to prevent simultaneous access. The object to be produced is pushed in the queue and the semaphore is signaled. When the function the lock object goes out of scope and the critical section is released.
Consuming is slightly more complicated:
virtual M consume ()
{
M result;
update.enter ();
while (std::queue<M, C>::empty ())
{
update.leave ();
con_sema.wait (); update.enter ();
}
result = this->front (); this->pop ();
update.leave ();
return result;
}
Again, we enter the critical section and check if the queue is empty. If so, we leave the critical section and start waiting for the consumers' semaphore to be signaled by a producer. When awoken by a signal, again we enter the critical section and loop again.
At this point, two things might have happened:
- No one else got the object and we find that queue is not empty. In this case, we exit the
while
loop, pick up the object and leave the critical section. - Another hungry consumer got the object and we find the queue empty. In this case, we leave the critical section and wait for another signal at the consumers' semaphore.
In addition to these main methods, there is another method to check if the queue is empty and another one to return the size of the queue. Note that both of them are only indicative because the result might change before the caller has a chance to check it.
Bounded Producer/Consumer Queue
A sharp-eyed reader might have noticed that sync_queue::produce
method has no error checking. It blissfully calls std::queue::push
and assumes there is enough memory for the new object. This can be seen also from the run times of the producer and consumer threads in the example above: it took producer only 1.7 seconds to fill the queue of numbers to be checked and took consumers 4.4 seconds to empty it.
The bounded_queue
class allows you to limit the number of objects that can be queued. If a producer finds the bounded queue full, it has to wait until a consumer removes some of the objects. To do that, we need one more semaphore, pro_sema
that is initialized with the maximum size of the queue. The produce
method becomes:
template< class M, class C = std::deque<M> >
class bounded_queue : public sync_queue<M, C>
{
public:
bounded_queue (size_t limit_) : limit (limit_)
{
pro_sema.signal ((int)limit);
}
void produce (const M& obj)
{
this->update.enter ();
while (std::queue<M, C>::size () > limit)
{
this->update.leave ();
pro_sema.wait ();
this->update.enter ();
}
this->push (obj);
this->con_sema.signal ();
this->update.leave ();
}
...
protected:
size_t limit;
semaphore pro_sema;
You can see that it is much more similar to the consume
method shown before. It enters the critical section and, if the queue is full, repeatedly tries to find space for the new object by waiting on pro_sema
.
If we change the prime numbers example to use the bounded_queue
structure with 20 entries, the results look like this:
bounded_queue finished producing in 4.32sec
finished consuming in 4.32sec
Expecting 41538 primes, found 41538
Consumer 0 found 5103 primes.
Consumer 1 found 5156 primes.
Consumer 2 found 5192 primes.
Consumer 3 found 5240 primes.
Consumer 4 found 5227 primes.
Consumer 5 found 5091 primes.
Consumer 6 found 5267 primes.
Consumer 7 found 5262 primes.
The time required by the producer thread is the same as the time required by the consumers. That is because the producer is held up by the limited queue size.
Conclusion
The producer/consumer queues presented in this article provide an easy to use mechanism for inter-thread communication. The threading primitives shown here (thread
, critical_section
, semaphore
, etc.) are part of the MLIB project. You can download the complete project from the GitHub project page.
History
- 4th October, 2020: Initial version