Introduction
This article includes a C# implementation of a Bound Blocking Queue (i.e. Bounded Circular Queue). In general, this is a queue that has a fixed number of elements it can contain (i.e., bounded), and will �block� a producer if queue is full, or block a consumer if queue is empty. This differs from the standard .NET Queue
in that .NET�s Queue
is unbounded and does not block.
A bounded blocking queue is a really handy object for Producer/Consumer applications such as network queues and pipelines. In a Producer/Consumer need, you normally don�t want your queue to grow unbounded as that can take up all available memory if not checked. Moreover, many times the producer and consumer are not balanced, so one will outrun the other. You normally don�t want a producer to produce 1000 objects per second if your consumer can only process 100 objects per second. You need a mechanism to throttle the producer(s) to allow the consumer(s) to catch up (and vice-versa). This situation is easily seen in network applications where you can get many more packets from the network than a consumer can handle, etc. You normally want to balance this to allow consumer thread(s) to work before more packets are queued. The �blocking� in Bound Blocking Queue relates to the fact that the queue will block producer(s) until a free slot is available, and automatically unblock producer(s) when free slot(s) become available. Conversely, the queue will block consumer(s) on an empty queue until objects are available to dequeue. The threads call their respective Enqueue
or Dequeue
methods, and the queue handles the blocking and unblocking automatically.
Background
I have looked at many blocking queue implementations such one lock queues, two lock queues, and queues locked with Semaphores. I have also ported a Non-Blocking queue implementation to C#.
In general, I find the one lock queue the best balance between performance, ease of implementation, and correctness validation (the hardest part in sync primitives and threading issues). A two lock queue would allow one consumer and one producer to work at the same time in a multi-CPU system. However, the enqueued object is still a shared object between the two threads, so you would need another lock to add the memory barrier to verify a consumer �sees� the object correctly that was produced. You can do this without a third lock, but requires much thought, and testing memory barriers issues, etc. Moreover, adding a third �shared� lock takes away any advantage of a two lock queue over a one lock queue, and would in fact be slower as each operation now requires two locks. Same issue with using two Semaphores for element counting. Most Semaphore queue implementations require two Semaphores (one to protect the enqueue and one for dequeue) and a third lock to protect the shared access in the middle (e.g., Sync-Sandwich). Each Semaphore internally uses a lock to sync itself, so you end up with two lock operations per dequeue and two per enqueue. A Non-Blocking (i.e. No-Lock) queue is a different animal all together and requires *very careful attention to memory barrier and threading issues. I have implemented one in C# but could not verify its correctness in the .NET memory model, so will not recommend a no lock queue until I see some other information on this in the CLR/.NET.
The Code
Below is the complete listing of the one lock Bounded Blocking Queue. Each method should be mostly self describing between the comments and the code itself. The code in the attached file should be used over this listing if they disagree. Please ping me if more explanation is required or you have comments or issues.
using System;
using System.Threading;
using System.Collections;
namespace Queues
{
public sealed class BlockingQueue : ICollection
{
#region Fields
private object[] buffer;
private int count;
private int size;
private int head;
private int tail;
private readonly object syncRoot;
#endregion
#region Constructors
public BlockingQueue(int size)
{
if ( size < 1 )
throw new ArgumentOutOfRangeException("size must"
+ " be greater then zero.");
syncRoot = new object();
this.size = size;
buffer = new object[size];
count = 0;
head = 0;
tail = 0;
}
#endregion
#region Properties
public object[] Values
{
get
{
object[] values;
lock(syncRoot)
{
values = new object[count];
int pos = head;
for(int i = 0; i < count; i++)
{
values[i] = buffer[pos];
pos = (pos + 1) % size;
}
}
return values;
}
}
#endregion
#region Public Methods
public void Enqueue(object value)
{
Enqueue(value, Timeout.Infinite);
}
public void Enqueue(object value, int millisecondsTimeout)
{
lock(syncRoot)
{
while(count == size)
{
try
{
if (!Monitor.Wait(syncRoot, millisecondsTimeout))
throw new QueueTimeoutException();
}
catch
{
Monitor.PulseAll(syncRoot);
throw;
}
}
buffer[tail] = value;
tail = (tail + 1) % size;
count++;
if (count == 1)
Monitor.PulseAll(syncRoot);
}
}
public bool TryEnqueue(object value)
{
lock(syncRoot)
{
if ( count == size )
return false;
buffer[tail] = value;
tail = (tail + 1) % size;
count++;
if ( count == 1 )
Monitor.PulseAll(syncRoot);
}
return true;
}
public object Dequeue()
{
return Dequeue(Timeout.Infinite);
}
public object Dequeue(int millisecondsTimeout)
{
object value;
lock(syncRoot)
{
while(count == 0)
{
try
{
if (!Monitor.Wait(syncRoot, millisecondsTimeout))
throw new QueueTimeoutException();
}
catch
{
Monitor.PulseAll(syncRoot);
throw;
}
}
value = buffer[head];
buffer[head] = null;
head = (head + 1) % size;
count--;
if ( count == (size - 1) )
Monitor.PulseAll(syncRoot);
}
return value;
}
public bool TryDequeue(out object value)
{
lock(syncRoot)
{
if ( count == 0 )
{
value = null;
return false;
}
value = buffer[head];
buffer[head] = null;
head = (head + 1) % size;
count--;
if ( count == (size - 1) )
Monitor.PulseAll(syncRoot);
}
return true;
}
public object Peek()
{
lock(syncRoot)
{
if (count == 0)
throw new InvalidOperationException("The Queue is empty.");
object value = buffer[head];
return value;
}
}
public bool TryPeek(out object value)
{
lock(syncRoot)
{
if ( count == 0 )
{
value = null;
return false;
}
value = buffer[head];
}
return true;
}
public void Clear()
{
lock(syncRoot)
{
count = 0;
head = 0;
tail = 0;
for(int i = 0; i < buffer.Length; i++)
{
buffer[i] = null;
}
}
}
#endregion
#region ICollection Members
public bool IsSynchronized
{
get { return true; }
}
public int Size
{
get { return this.size; }
}
public int Count
{
get { lock(syncRoot) { return count; } }
}
public void CopyTo(Array array, int index)
{
object[] tmpArray = Values;
tmpArray.CopyTo(array, index);
}
public object SyncRoot
{
get { return this.syncRoot; }
}
#endregion
#region IEnumerable Members
public IEnumerator GetEnumerator()
{
throw new NotImplementedException("Not Implemented.");
}
#endregion
}
public class QueueTimeoutException : Exception
{
public QueueTimeoutException() : base("Queue method timed out on wait.")
{
}
}
}
Points of Interest
The zip file also includes a test class, BlockingQueueTest.cs, that will allow you to test the queue using multiple threads. This is a simple network simulation using two queues, one Listener object, and one Server object. The Listener simulates packets received from a network and places objects into its output queue. This queue is used as the input queue for the Server object; which dequeues objects, processes them, and places its objects into a second queue (i.e., the Server�s output queue). The Server�s output queue is also the Listener�s input queue as the Listener does double duty as both a listener and sender.
History
- 08/17/2004 � V1 - First post.