As I learn about atomics and memory model, I decided to take a stab at rewriting my blocking queue using atomic operations and eliminate the mutex around the critical section of code responsible for pushing and popping elements, effectively creating a fast path through the queue if no blocking is taking place.
Let’s jump straight into the code:
template<typename Q = T>
typename std::enable_if<
std::is_copy_constructible<Q>::value and
std::is_nothrow_copy_constructible<Q>::value, void>::type
push(const T& item) noexcept
{
m_openSlots.wait();
auto pushIndex = m_pushIndex.fetch_add(1);
new (m_data + (pushIndex % m_size)) T (item);
++m_count;
auto expected = m_pushIndex.load();
while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size))
expected = m_pushIndex.load();
m_fullSlots.post();
}
- Lines 1-4 are basically a template concept which specifies that this method will only be present if the type is no-throw copy constructible.
- Line 7 is the same semaphore decrement and possible blocking if the queue is full. The fast path of this semaphore implementation uses only atomic operations, so if it doesn’t block, it will not engage a mutex (
fast_semaphore
code available on GitHub). - Line 9 is where the magic starts. We atomically increment the
m_pushIndex
while fetching its previous value into a temporary pushIndex
. From now on, we work with the temporary. - Line 10 is where we insert the element by copy constructing it in the right open slot.
- Line 11 is book-keeping needed during the destruction of the queue.
- Line 13-15 is where we have to modulo the
m_pushIndex
with m_size
, so it never overflows. It checks, in a loop, if it has changed, if it has, it loads it back into expected
and checks again until it hasn’t changed in which case it atomically swaps m_pushIndex
with m_pushIndex % m_size
. - Line 17 signals to other blocked threads, if there are any, that the queue now has an element available for popping.
Other methods of the queue work in a very similar way so I will not be describing them in detail here. The only crux of this implementation is that it only works for no-throw copyable and movable types; so declare your constructors and assignment operators with noexcept
if you want to use them with this queue.
Complete Listing
#pragma once
#include <atomic>
#include <utility>
#include <type_traits>
#include <cassert>
#include "semaphore.h"
template<typename T>
class fast_blocking_queue
{
public:
explicit fast_blocking_queue(unsigned int size)
: m_size(size), m_pushIndex(0), m_popIndex(0), m_count(0),
m_data((T*)operator new(size * sizeof(T))),
m_openSlots(size), m_fullSlots(0)
{
assert(size != 0);
}
~fast_blocking_queue() noexcept
{
while (m_count--)
{
m_data[m_popIndex].~T();
m_popIndex = ++m_popIndex % m_size;
}
operator delete(m_data);
}
template<typename Q = T>
typename std::enable_if<
std::is_copy_constructible<Q>::value and
std::is_nothrow_copy_constructible<Q>::value, void>::type
push(const T& item) noexcept
{
m_openSlots.wait();
auto pushIndex = m_pushIndex.fetch_add(1);
new (m_data + (pushIndex % m_size)) T (item);
++m_count;
auto expected = m_pushIndex.load();
while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size))
expected = m_pushIndex.load();
m_fullSlots.post();
}
template<typename Q = T>
typename std::enable_if<
std::is_move_constructible<Q>::value and
std::is_nothrow_move_constructible<Q>::value, void>::type
push(T&& item) noexcept
{
m_openSlots.wait();
auto pushIndex = m_pushIndex.fetch_add(1);
new (m_data + (pushIndex % m_size)) T (std::move(item));
++m_count;
auto expected = m_pushIndex.load();
while(!m_pushIndex.compare_exchange_strong(expected, m_pushIndex % m_size))
expected = m_pushIndex.load();
m_fullSlots.post();
}
template<typename Q = T>
typename std::enable_if<
not std::is_move_assignable<Q>::value and
std::is_nothrow_copy_assignable<Q>::value, void>::type
pop(T& item) noexcept
{
m_fullSlots.wait();
auto popIndex = m_popIndex.fetch_add(1);
item = m_data[popIndex % m_size];
m_data[popIndex % m_size].~T();
--m_count;
auto expected = m_popIndex.load();
while(!m_popIndex.compare_exchange_strong(expected, m_popIndex % m_size))
expected = m_popIndex.load();
m_openSlots.post();
}
template<typename Q = T>
typename std::enable_if<
std::is_move_assignable<Q>::value and
std::is_nothrow_move_assignable<Q>::value, void>::type
pop(T& item) noexcept
{
m_fullSlots.wait();
auto popIndex = m_popIndex.fetch_add(1);
item = std::move(m_data[popIndex % m_size]);
m_data[popIndex % m_size].~T();
--m_count;
auto expected = m_popIndex.load();
while(!m_popIndex.compare_exchange_strong(expected, m_popIndex % m_size))
expected = m_popIndex.load();
m_openSlots.post();
}
T pop() noexcept(std::is_nothrow_invocable_r<void,
decltype(&fast_blocking_queue<T>::pop<T>), T&>::value)
{
T item;
pop(item);
return item;
}
private:
const unsigned int m_size;
std::atomic_uint m_pushIndex;
std::atomic_uint m_popIndex;
std::atomic_uint m_count;
T* m_data;
fast_semaphore m_openSlots;
fast_semaphore m_fullSlots;
};