Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / programming / threads

A multi-thread queue for the producers/consumers process

4.67/5 (2 votes)
15 Jan 2013CPOL2 min read 25.7K   750  
How to write a simple multi-thread queue for the typical producer-consumer process

Introduction

Multithreading is one of the very powerful techniques you can use to increase performance in your application, parallelize tasks and do asynchronous elaborations.

There are a lot of possibilities; one of the very common is the typical process of producers/consumers.  

In this article I wish to show a simple way to parallelize the consumers of a queue in memory. Using this simple approach you can create a pool of consumers and then feed them by the queue.

Background 

.Net provides a simple class for handling a queue structure. .NET also provides a ConcurrentQueue class, but for this sample I’ve used the basic generic Queue class. I don’t either use the Synchronized method because lock is handled inside my class in order to dispatch elements to the different threads. 

Using the code 

The library can be used inside every project. This first draft is composed of only two classes and one interface. The first class is the generic class for creating a multi-thread queue. Different threads can access this queue to write new elements and a pool of readers can be created to read new elements in parallel. The second one is the abstract class to create a reader thread. 

C#
// Instance a multi threading queue for double values.
MultiThreadQueueLib.MultiThreadQueue<double> queue = new MultiThreadQueueLib.MultiThreadQueue<double>();     

Reading the values inserted in the queue is very simple. You have to create a new class inherited from ThreadDequeuer and define the method OnNewElement for this new class. This method is called out every time a new element is available on the queue.  

C#
class ThreadDequeuer : MultiThreadQueueLib.MultiThreadDequeuer
{
    …
    public ThreadDequeuer(MultiThreadQueueLib.IMultiThreadQueue queue) 
        : base(queue)
    {
    }
    protected override void OnNewElement(object element)
    {
        …
    }
} 
To start the reader(s) you have to instance your dequeuer class and then call the method Start.

Points of interest 

The synchronization is done using the basic lock mechanism. Asynchronous access by the reader is realized by a manual event. When a new element is added in the queue, the event is set so every reader that is waiting for that event is released. After that, every reader tries to read. If an element is available, it’s processed. When reading elements queue size go down to 0, the event is reset.

This approach is simple and independent of the reader number. Also, it’s thought for cases in which write operation are very frequent reducing the synchronization mechanism to a minimum. It means that it’s not the best performance. Also, it works with framework 3.5 when framework 4.0 and 4.5 have introduced new mechanisms for thread synchronization.  

C#
/// <summary> 
/// Enqueue a new element in the queue. If the queue is empty the manual resent event is setted in order to allow readers  
/// to process the incoming elements.
/// </summary> 
/// <param name="value">The value to insert in the queue.</param>
public void Enqueue(V value)
{
    lock (list)
    {
        list.Enqueue(value);
        if (list.Count == 1)
        {
            newElementEvent.Set();
        }
    }
}
/// <summary>
/// Check if the queue already contains the value.
/// </summary>
/// <param name="value">The value to check</param>
/// <returns>True if the queue contains the value, otherwise false.</returns>
public bool Contains(V value)
{
    lock (list)
    {
        return list.Contains(value);
    }
}
/// <summary>
/// Wait for new element and unqueue it. When the queue is empty the manual reset event is reset.
/// A timeout for the waiting could be defined.
/// </summary>
/// <param name="millisecondsTimeout">The timeout for waiting a new element.</param>
/// <returns>The new element to process. Null in case there's not a new element to process.</returns>
public object Unqueue(int millisecondsTimeout)
{
    bool flag = this.newElementEvent.WaitOne(millisecondsTimeout);
    if (!flag)
    {
        return null;
    }
    lock (list)
    {
        if (list.Count == 0)
        {
            return null;
        }
        V ret = list.Dequeue();
        if (list.Count == 0)
        {
            this.newElementEvent.Reset();
        }
        return ret;
    }
}
/// <summary>
/// Wait for new element and unqueue it. When the queue is empty the manual reset event is reset.
/// This method has no timeout.
/// </summary>
/// <returns>The new element to process. Null in case there's not a new element to process.</returns>
public object Unqueue()
{
    return this.Unqueue(-1);
}

So, the reader is always waiting for a new incoming element and when it gets the element, the OnNewElement method is called with this one.  

C#
/// <summary>
/// Thread loop. Wait for a new incoming element and in case the thread gets the new element, the mehod OnNewElement is called.
/// </summary>
protected virtual void Run()
{
    while (!shutdown)
    {
        try
        {
            object newElement = this.sourceQueue.Unqueue();
            if (newElement != null)
            {
                OnNewElement(newElement);
            }
        }
        catch (ThreadInterruptedException thInt)
        {
        }
        catch (ThreadAbortException thAbort)
        {
            shutdown = true;
        }
        catch (Exception ex)
        {
            Console.Out.WriteLine(ex);
        }
    }
    Console.Out.WriteLine("Exit from Run. Thread is stopped.");
} 

 The attached solution contains the library source code and a simple test console application. 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)