Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C++

Atomic Blocking Queue

0.00/5 (No votes)
14 Mar 2019MIT2 min read 2.1K  
Atomic blocking queue

As I learn about atomics and memory model, I decided to take a stab at rewriting my blocking queue using atomic operations and eliminate the mutex around the critical section of code responsible for pushing and popping elements, effectively creating a fast path through the queue if no blocking is taking place.

Let’s jump straight into the code:

C++
template<typename Q = T>
typename std::enable_if<
	std::is_copy_constructible<Q>::value and
	std::is_nothrow_copy_constructible<Q>::value, void>::type
push(const T& item) noexcept
{
	m_openSlots.wait();

	auto pushIndex = m_pushIndex.fetch_add(1);
	new (m_data + (pushIndex % m_size)) T (item);
	++m_count;

	auto expected = m_pushIndex.load();
	while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size))
		expected = m_pushIndex.load();

	m_fullSlots.post();
}
  • Lines 1-4 are basically a template concept which specifies that this method will only be present if the type is no-throw copy constructible.
  • Line 7 is the same semaphore decrement and possible blocking if the queue is full. The fast path of this semaphore implementation uses only atomic operations, so if it doesn’t block, it will not engage a mutex (fast_semaphore code available on GitHub).
  • Line 9 is where the magic starts. We atomically increment the m_pushIndex while fetching its previous value into a temporary pushIndex. From now on, we work with the temporary.
  • Line 10 is where we insert the element by copy constructing it in the right open slot.
  • Line 11 is book-keeping needed during the destruction of the queue.
  • Line 13-15 is where we have to modulo the m_pushIndex with m_size, so it never overflows. It checks, in a loop, if it has changed, if it has, it loads it back into expected and checks again until it hasn’t changed in which case it atomically swaps m_pushIndex with m_pushIndex % m_size.
  • Line 17 signals to other blocked threads, if there are any, that the queue now has an element available for popping.

Other methods of the queue work in a very similar way so I will not be describing them in detail here. The only crux of this implementation is that it only works for no-throw copyable and movable types; so declare your constructors and assignment operators with noexcept if you want to use them with this queue. 🙂

Complete Listing

C++
#pragma once

#include <atomic>
#include <utility>
#include <type_traits>
#include <cassert>
#include "semaphore.h"

template<typename T>
class fast_blocking_queue
{
public:
	explicit fast_blocking_queue(unsigned int size)
	: m_size(size), m_pushIndex(0), m_popIndex(0), m_count(0),
	m_data((T*)operator new(size * sizeof(T))),
	m_openSlots(size), m_fullSlots(0)
	{
		assert(size != 0);
	}

	~fast_blocking_queue() noexcept
	{
		while (m_count--)
		{
			m_data[m_popIndex].~T();
			m_popIndex = ++m_popIndex % m_size;
		}
		operator delete(m_data);
	}

	template<typename Q = T>
	typename std::enable_if<
		std::is_copy_constructible<Q>::value and
		std::is_nothrow_copy_constructible<Q>::value, void>::type
	push(const T& item) noexcept
	{
		m_openSlots.wait();

		auto pushIndex = m_pushIndex.fetch_add(1);
		new (m_data + (pushIndex % m_size)) T (item);
		++m_count;

		auto expected = m_pushIndex.load();
		while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size))
			expected = m_pushIndex.load();

		m_fullSlots.post();
	}

	template<typename Q = T>
	typename std::enable_if<
		std::is_move_constructible<Q>::value and
		std::is_nothrow_move_constructible<Q>::value, void>::type
	push(T&& item) noexcept
	{
		m_openSlots.wait();

		auto pushIndex = m_pushIndex.fetch_add(1);
		new (m_data + (pushIndex % m_size)) T (std::move(item));
		++m_count;

		auto expected = m_pushIndex.load();
		while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size))
			expected = m_pushIndex.load();

		m_fullSlots.post();
	}

	template<typename Q = T>
	typename std::enable_if<
		not std::is_move_assignable<Q>::value and
		std::is_nothrow_copy_assignable<Q>::value, void>::type
	pop(T& item) noexcept
	{
		m_fullSlots.wait();

		auto popIndex = m_popIndex.fetch_add(1);
		item = m_data[popIndex % m_size];
		m_data[popIndex % m_size].~T();
		--m_count;

		auto expected = m_popIndex.load();
		while(!m_popIndex.compare_exchange_strong(expected, m_popIndex % m_size))
			expected = m_popIndex.load();

		m_openSlots.post();
	}

	template<typename Q = T>
	typename std::enable_if<
		std::is_move_assignable<Q>::value and
		std::is_nothrow_move_assignable<Q>::value, void>::type
	pop(T& item) noexcept
	{
		m_fullSlots.wait();

		auto popIndex = m_popIndex.fetch_add(1);
		item = std::move(m_data[popIndex % m_size]);
		m_data[popIndex % m_size].~T();
		--m_count;

		auto expected = m_popIndex.load();
		while(!m_popIndex.compare_exchange_strong(expected, m_popIndex % m_size))
			expected = m_popIndex.load();

		m_openSlots.post();
	}

	T pop() noexcept(std::is_nothrow_invocable_r<void, 
                     decltype(&fast_blocking_queue<T>::pop<T>), T&>::value)
	{
		T item;
		pop(item);
		return item;
	}

private:
	const unsigned int m_size;
	std::atomic_uint m_pushIndex;
	std::atomic_uint m_popIndex;
	std::atomic_uint m_count;
	T* m_data;

	fast_semaphore m_openSlots;
	fast_semaphore m_fullSlots;
};

License

This article, along with any associated source code and files, is licensed under The MIT License