Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / Java

MPSC Lock Free Intrusive Linked Queue with state

5.00/5 (2 votes)
1 Feb 2015CPOL3 min read 22K   73  
MPSC Lock Free Intrusive Linked Queue with state

Introduction

Inter thread communication (ITC) with queues become a common pattern with available cores number growing. Set of problems and harware limitations gives a set of requirements for the queue implementation. Currently we can describe following common ITC queue patterns:

  • single producer - single consumer queue (SPSC)
  • multiple producers - single consumer queue (MPSC)
  • multiple producers - multiple consumers queue (MPMC)

Since Java 1.5 we have some native queues (even lock-free), but not one of them gives an opportunity to make a strict usage implementation. What does it mean?

Background

Let's assume we have an event producer, event consumer and some queue as a transport between them. Usually producer and consumer are threads. The idea is to avoid any useless work. So, the strict implementation for consumer would look like (pseudocode):

C++
while (1)
{
    if (queue.isEmpty())
        wait_not_empty();
    Item item = queue.get();
    process(item);
}        

 And the strict implemntation for producer would be:

C++
queue.put(item);
if (queue_was_empty) // and only if
    wakeup_consumer_thread();

All Java native queues and most open source projects (which just implements the java.util.Queue interface) gives no way to make it properly. The possible code for producer with java.util.Queue would be:

C++
boolean wasEmpty = queue.isEmpty();
if (queue.offer(item) && wasEmpty)
    wakeup_consumer_thread();

But it will work properly only for single-producer case. For many-producers case we need an atomic add-item-and-check-emptiness operation on the queue. So the MPSC queue interface with requirements above will look like:

C++
// Return true if queue was empty
boolean put( Item item );

Queue item getter obviously should be the similar: it should return 2 values (next available item and the state of the queue, is it empty or not). Java gives no a nice way to return 2 values from the method, so let's use 2 step get:

C++
Item get();
Item get_next();

First methos supposed to be called when we defenitely know the queue is not empty, second method removes the first item in the queue and returns next item (return null if queue become empty with remove), so the consumer side implementation will look like:

Java
while (1)
{
    sleep();
    // now we defenitely know the queue is not empty
    Item item = queue.get();
    do
    {
        process(item);
        item = queue.get_next();
    }
    while (item != null);
}

This approach gives one more benefit: the queue stays non-empty for a bit longer time, so we can reduce number of thread wake ups.

Can we do anything else to make the queue better?
C/C++ gives a simple way to use intrusive containers (container does not allocate any additional data). Java GC is tuned well, but opportunity to reduce the GC presure still looks tempting. Let's try! We require each queue item to has a reference we can use to build a linked list, and also we need a modifier for this reference. It looks like not a big price for that!

The final version looks like:

Java
public class MpscIntrusiveLinkedQueue<T>
{
    private final AtomicReferenceFieldUpdater<T, T> m_itemUpdater;
    private T m_head;
    private final AtomicReference<T> m_tail;

    public MpscIntrusiveLinkedQueue( AtomicReferenceFieldUpdater<T, T> itemUpdater )
    {
        m_itemUpdater = itemUpdater;
        m_tail = new AtomicReference<T>();
    }

    public final boolean put( T item )
    {
        assert( m_itemUpdater.get(item) == null );
        for (;;)
        {
            final T tail = m_tail.get();
            if (m_tail.compareAndSet(tail, item))
            {
                if (tail == null)
                {
                    m_head = item;
                    return true;
                }
                else
                {
                    m_itemUpdater.set( tail, item );
                    return false;
                }
            }
        }
    }

    final T get()
    {
        assert( m_head != null );
        return m_head;
    }

    final T get_next()
    {
        assert( m_head != null );
        final T head = m_head;
        T next = m_itemUpdater.get( head );
        if (next == null)
        {
            m_head = null;
            if (m_tail.compareAndSet(head, null))
                return null;
            while ((next = m_itemUpdater.get(head)) == null);
        }
        m_itemUpdater.lazySet( head, null );
        m_head = next;
        return m_head;
    }
}

Using the code

Now we can implement a strict ITC with queue, probably semaphore would be the best chice for thread synchronization.

Java
class Item
{
    public volatile Item nextItem; // member to be used by the queue implementation
    public final int value;

    public Item( int value )
    {
        this.value = value;
    }
}

final Semaphore sema = new Semaphore();
final MpscIntrusiveLinkedQueue<Item> queue = new MpscIntrusiveLinkedQueue<Item>(
        AtomicReferenceFieldUpdater.newUpdater(Item.class, Item.class, "nextItem");

Producer:

if (queue.put(new Item(1)))
    sema.release();

Consumer:

Java
try
{
    for (;;)
    {
        sema.acquire();
        Item item = m_queue.get();
        do
        {
            process(item);
            item = m_queue.get_next();
        }
        while (item != null);
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    }

What about performance?

Let's check it (Intel Core i7-860@2.8Ghz, Win7-64): test runs 2 threads (producer and consumer), producer creates 1000000 events and dispatches them to the consumer. Measuring time on producer and consumer siders:

C++
test:
     [java] **** MpscIntrusiveLinkedQueue
     [java] 1000 events dispatched at 0.001243 sec (2 wake ups).
     [java] 1000 events processed at 0.001243 sec.
     [java] **** MpscIntrusiveLinkedQueue
     [java] 1000000 events dispatched at 0.053543 sec (3 wake ups).
     [java] 1000000 events processed at 0.053884 sec.

Time distribution for producer/consumer is from 0.05 to 0.11 seconds for 1000000 events (depending on the amound of thread wake ups).
Is it good or bad? Who knows...
Let's try the similar test with something known as fast (Disruptor):

dtest:
     [java] **** Disruptor
     [java] 1000 events dispatched at 0.004439 sec.
     [java] Processed at 0.003985 sec.
     [java] **** Disruptor
     [java] Processed at 0.126224 sec.
     [java] 1000000 events dispatched at 0.126291 sec.

Took the best time for both the disruptor and for the MpscIntrusiveLinkedQueue.

Source code is attached to the article and available on Github: https://github.com/js-labs/mpscilq

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)