Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Bounded Blocking Queue (One Lock)

0.00/5 (No votes)
17 Aug 2004 1  
A fast and flexible Bounded Blocking Queue. Great for general Producer/Consumer needs such as network queues and pipelines.

Introduction

This article includes a C# implementation of a Bound Blocking Queue (i.e. Bounded Circular Queue). In general, this is a queue that has a fixed number of elements it can contain (i.e., bounded), and will �block� a producer if queue is full, or block a consumer if queue is empty. This differs from the standard .NET Queue in that .NET�s Queue is unbounded and does not block.

A bounded blocking queue is a really handy object for Producer/Consumer applications such as network queues and pipelines. In a Producer/Consumer need, you normally don�t want your queue to grow unbounded as that can take up all available memory if not checked. Moreover, many times the producer and consumer are not balanced, so one will outrun the other. You normally don�t want a producer to produce 1000 objects per second if your consumer can only process 100 objects per second. You need a mechanism to throttle the producer(s) to allow the consumer(s) to catch up (and vice-versa). This situation is easily seen in network applications where you can get many more packets from the network than a consumer can handle, etc. You normally want to balance this to allow consumer thread(s) to work before more packets are queued. The �blocking� in Bound Blocking Queue relates to the fact that the queue will block producer(s) until a free slot is available, and automatically unblock producer(s) when free slot(s) become available. Conversely, the queue will block consumer(s) on an empty queue until objects are available to dequeue. The threads call their respective Enqueue or Dequeue methods, and the queue handles the blocking and unblocking automatically.

Background

I have looked at many blocking queue implementations such one lock queues, two lock queues, and queues locked with Semaphores. I have also ported a Non-Blocking queue implementation to C#.

In general, I find the one lock queue the best balance between performance, ease of implementation, and correctness validation (the hardest part in sync primitives and threading issues). A two lock queue would allow one consumer and one producer to work at the same time in a multi-CPU system. However, the enqueued object is still a shared object between the two threads, so you would need another lock to add the memory barrier to verify a consumer �sees� the object correctly that was produced. You can do this without a third lock, but requires much thought, and testing memory barriers issues, etc. Moreover, adding a third �shared� lock takes away any advantage of a two lock queue over a one lock queue, and would in fact be slower as each operation now requires two locks. Same issue with using two Semaphores for element counting. Most Semaphore queue implementations require two Semaphores (one to protect the enqueue and one for dequeue) and a third lock to protect the shared access in the middle (e.g., Sync-Sandwich). Each Semaphore internally uses a lock to sync itself, so you end up with two lock operations per dequeue and two per enqueue. A Non-Blocking (i.e. No-Lock) queue is a different animal all together and requires *very careful attention to memory barrier and threading issues. I have implemented one in C# but could not verify its correctness in the .NET memory model, so will not recommend a no lock queue until I see some other information on this in the CLR/.NET.

The Code

Below is the complete listing of the one lock Bounded Blocking Queue. Each method should be mostly self describing between the comments and the code itself. The code in the attached file should be used over this listing if they disagree. Please ping me if more explanation is required or you have comments or issues.

using System;
using System.Threading;
using System.Collections;

namespace Queues
{
    /// <summary>

    /// Author: William Stacey, MVP (staceyw@mvps.org)

    /// Modified Date: 08/03/2004

    /// One Lock Bounded Blocking Queue (e.g. Bounded Buffer).

    /// This queue is internally synchronized (thread-safe)

    /// and designed for one-many producers and one-many

    /// consumer threads.  This is ideal for pipelining

    /// or other consumer/producer needs.

    /// Fast and thread safe on single or multiple cpu machines.

    /// 

    /// Consumer thread(s) will block on Dequeue operations

    /// until another thread performs a Enqueue

    /// operation, at which point the first scheduled consumer

    /// thread will be unblocked and get the

    /// current object.  Producer thread(s) will block

    /// on Enqueue operations until another

    /// consumer thread calls Dequeue to free a queue slot,

    /// at which point the first scheduled producer

    /// thread will be unblocked to finish its Enqueue operation.

    /// No user code is needed to

    /// handle this "ping-pong" between locking/unlocking

    /// consumers and producers. 

    /// </summary>

    public sealed class BlockingQueue : ICollection
    {
        #region Fields
        // Buffer used to store queue objects with max "Size".

        private object[] buffer;
        // Current number of elements in the queue.

        private int count;
        // Max number of elements queue can hold without blocking.

        private int size;
        // Index of slot for object to remove on next Dequeue.

        private int head;
        // Index of slot for next Enqueue object.

        private int tail;
        // Object used to synchronize the queue.

        private readonly object syncRoot;
        #endregion

        #region Constructors
        /// <summary>

        /// Create instance of Queue with Bounded number of elements.

        /// After that many elements are used, another Enqueue

        /// operation will "block" or wait until a Consumer calls

        /// Dequeue to free a slot.  Likewise, if the queue

        /// is empty, a call to Dequeue will block until

        /// another thread calls Enqueue.

        /// </summary>

        /// <param name="size"></param>

        public BlockingQueue(int size)
        {
            if ( size < 1 )
                throw new ArgumentOutOfRangeException("size must" 
                                     + " be greater then zero.");
            syncRoot = new object();
            this.size = size;
            buffer = new object[size];
            count = 0;
            head = 0;
            tail = 0;
        }
        #endregion

        #region Properties
        /// <summary>

        /// Gets the object values currently in the queue.

        /// If queue is empty, this will return a zero length array.

        /// The returned array length can be 0 to Size.

        /// This method does not modify the queue,

        /// but returns a shallow copy

        /// of the queue buffer containing the objects

        /// contained in the queue.

        /// </summary>

        public object[] Values
        {
            get
            {
                // Copy used elements to a new array

                // of "count" size.  Note a simple

                // Buffer copy will not work as head

                // could be anywhere and we want

                // a zero based array.

                object[] values;
                lock(syncRoot)
                {
                    values = new object[count];
                    int pos = head;
                    for(int i = 0; i < count; i++)
                    {
                        values[i] = buffer[pos];
                        pos = (pos + 1) % size;
                    }
                }
                return values;
            }
        }

        #endregion

        #region Public Methods

        /// <summary>

        /// Adds an object to the end of the queue.

        /// If queue is full, this method will

        /// block until another thread calls one

        /// of the Dequeue methods.  This method will wait

        /// "Timeout.Infinite" until queue has a free slot.

        /// </summary>

        /// <param name="value"></param>

        public void Enqueue(object value)
        {
            Enqueue(value, Timeout.Infinite);
        }

        /// <summary>

        /// Adds an object to the end of the queue.

        /// If queue is full, this method will

        /// block until another thread calls one

        /// of the Dequeue methods or millisecondsTimeout

        /// expires.  If timeout, method will throw QueueTimeoutException.

        /// </summary>

        /// <param name="value"></param>

        public void Enqueue(object value, int millisecondsTimeout)
        {
            lock(syncRoot)
            {
                while(count == size)
                {
                    try
                    {
                        if (!Monitor.Wait(syncRoot, millisecondsTimeout))
                            throw new QueueTimeoutException();
                    }
                    catch
                    {
                        // Monitor exited with exception.

                        // Could be owner thread of monitor

                        // object was terminated or timeout

                        // on wait. Pulse any/all waiting

                        // threads to ensure we don't get

                        // any "live locked" producers.

                        Monitor.PulseAll(syncRoot);
                        throw;
                    }
                }
                buffer[tail] = value;
                tail = (tail + 1) % size;
                count++;
                if (count == 1) // Could have blocking Dequeue thread(s).

                    Monitor.PulseAll(syncRoot);
            }
        }

        /// <summary>

        /// Non-blocking version of Enqueue().

        /// If Enqueue is successfull, this will

        /// return true; otherwise false if queue is full.

        /// </summary>

        /// <param name="value"></param>

        /// <returns>true if successfull,

        /// otherwise false.</returns>

        public bool TryEnqueue(object value)
        {
            lock(syncRoot)
            {
                if ( count == size )
                    return false;
                buffer[tail] = value;
                tail = (tail + 1) % size;
                count++;
                if ( count == 1 ) 
                // Could have blocking Dequeue thread(s).

                    Monitor.PulseAll(syncRoot);
            }
            return true;
        }

        /// <summary>

        /// Removes and returns the object

        /// at the beginning of the Queue.

        /// If queue is empty, method will block until

        /// another thread calls one of

        /// the Enqueue methods. This method will wait

        /// "Timeout.Infinite" until another

        /// thread Enqueues and object.

        /// </summary>

        /// <returns></returns>

        public object Dequeue()
        {
            return Dequeue(Timeout.Infinite);
        }

        /// <summary>

        /// Removes and returns the object

        /// at the beginning of the Queue.

        /// If queue is empty, method will block until

        /// another thread calls one of

        /// the Enqueue methods or millisecondsTimeout expires.

        /// If timeout, method will throw QueueTimeoutException.

        /// </summary>

        /// <returns>The object that is removed from

        /// the beginning of the Queue.</returns>

        public object Dequeue(int millisecondsTimeout)
        {
            object value;
            lock(syncRoot)
            {
                while(count == 0)
                {
                    try
                    {
                        if (!Monitor.Wait(syncRoot, millisecondsTimeout))
                            throw new QueueTimeoutException();
                    }
                    catch
                    {
                        Monitor.PulseAll(syncRoot);
                        throw;
                    }
                }
                value = buffer[head];
                buffer[head] = null;
                head = (head + 1) % size;
                count--;
                if ( count == (size - 1) )
                // Could have blocking Enqueue thread(s).

                    Monitor.PulseAll(syncRoot);
            }
            return value;
        }

        /// <summary>

        /// Non-blocking version of Dequeue.

        /// Will return false if queue is empty and set

        /// value to null, otherwise will return true

        /// and set value to the dequeued object.

        /// </summary>

        /// <param name="value">The object that is removed from

        ///     the beginning of the Queue or null if empty.</param>

        /// <returns>true if successfull, otherwise false.</returns>

        public bool TryDequeue(out object value)
        {
            lock(syncRoot)
            {
                if ( count == 0 )
                {
                    value = null;
                    return false;
                }
                value = buffer[head];
                buffer[head] = null;
                head = (head + 1) % size;
                count--;
                if ( count == (size - 1) )
                // Could have blocking Enqueue thread(s).

                    Monitor.PulseAll(syncRoot);
            }
            return true;
        }

        /// <summary>

        /// Returns the object at the beginning

        /// of the queue without removing it.

        /// </summary>

        /// <returns>The object at the beginning

        /// of the queue.</returns>

        /// <remarks>

        /// This method is similar to the Dequeue method,

        /// but Peek does not modify the queue. 

        /// A null reference can be added to the Queue as a value. 

        /// To distinguish between a null value and the end of the queue,

        /// check the Count property or

        /// catch the InvalidOperationException,

        /// which is thrown when the Queue is empty.

        /// </remarks>

        /// <exception cref="InvalidOpertionException">

        ///                The queue is empty.</exception>

        public object Peek()
        {
            lock(syncRoot)
            {
                if (count == 0)
                    throw new InvalidOperationException("The Queue is empty.");
            
                object value = buffer[head];
                return value;
            }
        }

        /// <summary>

        /// Returns the object at the beginning

        /// of the Queue without removing it.

        /// Similar to the Peek method, however this method

        /// will not throw exception if

        /// queue is empty, but instead will return false.

        /// </summary>

        /// <param name="value">The object at the beginning

        ///          of the Queue or null if empty.</param>

        /// <returns>The object at the beginning of the Queue.</returns>

        public bool TryPeek(out object value)
        {
            lock(syncRoot)
            {
                if ( count == 0 )
                {
                    value = null;
                    return false;
                }
                value = buffer[head];
            }
            return true;
        }

        /// <summary>

        /// Removes all objects from the Queue.

        /// </summary>

        /// <remarks>

        /// Count is set to zero. Size does not change.

        /// </remarks>

        public void Clear()
        {
            lock(syncRoot)
            {
                count = 0;
                head = 0;
                tail = 0;
                for(int i = 0; i < buffer.Length; i++)
                {
                    buffer[i] = null;
                }
            }
        }

        #endregion

        #region ICollection Members
        /// <summary>

        /// Gets a value indicating whether access

        /// to the Queue is synchronized (thread-safe).

        /// </summary>

        public bool IsSynchronized
        {
            get    { return true; }
        }

        /// <summary>

        /// Returns the max elements allowed

        /// in the queue before blocking Enqueue

        /// operations.  This is the size set in the constructor.

        /// </summary>

        public int Size
        {
            get { return this.size;    }
        }

        /// <summary>

        /// Gets the number of elements contained in the Queue.

        /// </summary>

        public int Count
        {
            get    { lock(syncRoot) { return count; } }
        }

        /// <summary>

        /// Copies the Queue elements to an existing one-dimensional Array,

        /// starting at the specified array index.

        /// </summary>

        /// <param name="array">

              /// The one-dimensional Array that is the destination

              /// of the elements copied from Queue.

              /// The Array must have zero-based indexing.</param>

        /// <param name="index">The zero-based index

        ///     in array at which copying begins.</param>

        public void CopyTo(Array array, int index)
        {
            object[] tmpArray = Values;
            tmpArray.CopyTo(array, index);
        }

        /// <summary>

        /// Gets an object that can be used to synchronize

        /// access to the Queue.

        /// </summary>

        public object SyncRoot
        {
            get    { return this.syncRoot; }
        }
        #endregion

        #region IEnumerable Members
        /// <summary>

        /// GetEnumerator not implemented. You can't enumerate

        /// the active queue as you would an array as it is dynamic

        /// with active gets and puts. You could

        /// if you locked it first and unlocked after

        /// enumeration, but that does not work well for GetEnumerator.

        /// The recommended method is to Get Values

        /// and enumerate the returned array copy.

        /// That way the queue is locked for

        /// only a short time and a copy returned

        /// so that can be safely enumerated using

        /// the array's enumerator. You could also

        /// create a custom enumerator that would

        /// dequeue the objects until empty queue,

        /// but that is a custom need. 

        /// </summary>

        /// <returns></returns>

        public IEnumerator GetEnumerator()
        {
            throw new NotImplementedException("Not Implemented.");
        }
        #endregion
    } // End BlockingQueue


    public class QueueTimeoutException : Exception
    {
        public QueueTimeoutException() : base("Queue method timed out on wait.")
        {
        }
    }
}

Points of Interest

The zip file also includes a test class, BlockingQueueTest.cs, that will allow you to test the queue using multiple threads. This is a simple network simulation using two queues, one Listener object, and one Server object. The Listener simulates packets received from a network and places objects into its output queue. This queue is used as the input queue for the Server object; which dequeues objects, processes them, and places its objects into a second queue (i.e., the Server�s output queue). The Server�s output queue is also the Listener�s input queue as the Listener does double duty as both a listener and sender.

History

  • 08/17/2004 � V1 - First post.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here