Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / programming / threads

Blocking Queue

0.00/5 (No votes)
15 Mar 2019MIT3 min read 3.8K  
Blocking queue

Where producer-consumer pattern is present, it is often the case that one is faster than the other: a parsing producer reads records faster than a processing consumer; a disk reading producer is faster than network sending consumer.

Producer and consumer often communicate by queues: the producer will put items on a queue while the consumer will pop items off a queue. What happens when the queue becomes full, or empty?
One approach of the producer is to try to put an item on a queue and if it’s full, yield the thread and repeat. Similarly, the consumer can try to pop an item off a queue and if it’s empty, ditto. This approach of try-fail-yield can unnecessarily burn CPU cycles in tight loops that constantly try to put or pop items off a queue.
Another approach is to temporarily grow the queue, but that doesn’t scale well.
When do we stop growing? And once we stop, we have to fall back onto the try-fail-yield method.

What if we could implement a blocking queue: a queue whose put operation blocks when the queue if full, and unblocks only when another thread pops an item off the queue. Similarly, a queue whose pop operation blocks when the queue is empty, and unblocks only when another thread puts an item on the queue. An example of using such a queue would look like this (notice a fast producer and slow consumer in the code below):

C++
#include <iostream>
#include <mutex>
#include <thread>
#include "queue.h"
using namespace std;

const int COUNT = 10;

int main(int argc, char** argv)
{
    blocking_queue<int> q(5);
    mutex cout_lock;

    thread producer([&]() {
        for(int i = 1; i <= COUNT; ++i)
        {
            q.push(i);
            {
                scoped_lock<mutex> lock(cout_lock);
                cout << "push v = " << i << endl;
            }
        }
    });

    thread consumer([&]() {
        for(int i = 1; i <= COUNT; ++i)
        {
            this_thread::sleep_for(1s);
            int v;
            q.pop(v);
            {
                scoped_lock<mutex> lock(cout_lock);
                cout << "pop  v = " << v << endl;
            }
        }
    });

    producer.join();
    consumer.join();

    return 1;
}

Notice no try-fail-yield code in the example above. The put operation of the fast producer simply blocks until the slow consumer makes more room on the queue. The output of the program is as we expect; the fast producer fills up the queue then blocks, a second later, the consumer starts to slowly pop items of the queue; they go in tandem for a while until the producer exits and the consumer drains the queue:

push v = 1
push v = 2
push v = 3
push v = 4
push v = 5
pop  v = 1
push v = 6
pop  v = 2
push v = 7
pop  v = 3
push v = 8
pop  v = 4
push v = 9
pop  v = 5
push v = 10
pop  v = 6
pop  v = 7
pop  v = 8
pop  v = 9
pop  v = 10
Program ended with exit code: 1

The trick to implementing such a queue is the ability to count both open and full slots of the queue, and block. A semaphore is a perfect mechanism to do just that. 🙂 In fact, we need two semaphores: one to count the open slots, and another to count the full slots. The open-slot semaphore starts with a count equal to the size of the queue. The full-slot semaphore starts with a count of zero. A push operation waits on the open-slot semaphore and signals the full-slot semaphore. A pop operation waits on the full-slot semaphore and signals the open-slot semaphore.

The blocking queue implementation below uses Boost semaphores to count and std::mutex to protect the critical section. In the next post, I will show how to make it safe in the presence of exceptions; currently, it misbehaves if T‘s copy constructor or assignment operator throw (it assumes T‘s destructor will never throw).

queue.h

C++
#pragma once

#include <mutex>
#include <boost/interprocess/sync/interprocess_semaphore.hpp>

template<typename T>
class blocking_queue
{
public:
	blocking_queue(unsigned int size)
	: m_size(size), m_pushIndex(0), m_popIndex(0), m_count(0),
	m_data((T*)operator new(size * sizeof(T))),
	m_openSlots(size), m_fullSlots(0) {}

	blocking_queue(const blocking_queue&) = delete;
	blocking_queue(blocking_queue&&) = delete;
	blocking_queue& operator = (const blocking_queue&) = delete;
	blocking_queue& operator = (blocking_queue&&) = delete;

	~blocking_queue()
	{
		while (m_count--)
		{
			m_data[m_popIndex].~T();
			m_popIndex = ++m_popIndex % m_size;
		}
		operator delete(m_data);
	}

	void push(const T& item)
	{
		m_openSlots.wait();
		{
			std::lock_guard<std::mutex> lock(m_cs);
			new (m_data + m_pushIndex) T (item);
			m_pushIndex = ++m_pushIndex % m_size;
            ++m_count;
		}
		m_fullSlots.post();
	}

	void pop(T& item)
	{
		m_fullSlots.wait();
		{
			std::lock_guard<std::mutex> lock(m_cs);
			item = m_data[m_popIndex];
			m_data[m_popIndex].~T();
			m_popIndex = ++m_popIndex % m_size;
            --m_count;
		}
		m_openSlots.post();
	}

    bool empty()
    {
        std::lock_guard<std::mutex> lock(m_cs);
        return m_count == 0;
    }

private:
	unsigned int m_size;
	unsigned int m_pushIndex;
	unsigned int m_popIndex;
    unsigned int m_count;
	T* m_data;

    boost::interprocess::interprocess_semaphore m_openSlots;
	boost::interprocess::interprocess_semaphore m_fullSlots;
    std::mutex m_cs;
};

P.S. It was suggested to me that the use of boost::interprocess::interprocess_semaphore is a heavy-handed approach. I agree. I only used it to keep the example code small and uncluttered with more utility classes. In production, you should have a lightweight semaphore class that uses a mutex and a condition variable. Like this 🙂

C++
#pragma once

#include <mutex>
#include <condition_variable>

class semaphore
{
public:
    semaphore(unsigned int count) : m_count(count) {}
    semaphore(const semaphore&&) = delete;
    semaphore(semaphore&&) = delete;
    semaphore& operator = (const semaphore&) = delete;
    semaphore& operator = (semaphore&&) = delete;
    ~semaphore() = default;
    
    void post()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        ++m_count;
        m_cv.notify_one();
    }
    
    void wait()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_cv.wait(lock, [&]{ return m_count > 0; });
        --m_count;
    }

private:
    std::mutex m_mutex;
    std::condition_variable m_cv;
    unsigned int m_count;
};

License

This article, along with any associated source code and files, is licensed under The MIT License