Introduction
Multithreading is one of the very powerful techniques you can use to increase performance in your application, parallelize tasks and do asynchronous elaborations.
There are a lot of possibilities; one of the very common is the typical process of producers/consumers.
In this article I wish to show a simple way to parallelize the consumers of a queue in memory. Using this simple approach you can create a pool of consumers and then feed them by the queue.
Background
.Net provides a simple class for handling a queue structure. .NET also provides a ConcurrentQueue
class, but for this sample I’ve used the basic generic Queue
class. I don’t either use the Synchronized
method because lock is handled inside my class in order to dispatch elements to the different threads.
Using the code
The library can be used inside every project. This first draft is composed of only two classes and one interface. The first class is the generic class for creating a multi-thread queue. Different threads can access this queue to write new elements and a pool of readers can be created to read new elements in parallel. The second one is the abstract class to create a reader thread.
MultiThreadQueueLib.MultiThreadQueue<double> queue = new MultiThreadQueueLib.MultiThreadQueue<double>();
Reading the values inserted in the queue is very simple. You have to create a new class inherited from ThreadDequeuer
and define the method OnNewElement
for this new class. This method is called out every time a new element is available on the queue.
class ThreadDequeuer : MultiThreadQueueLib.MultiThreadDequeuer
{
…
public ThreadDequeuer(MultiThreadQueueLib.IMultiThreadQueue queue)
: base(queue)
{
}
protected override void OnNewElement(object element)
{
…
}
}
To start the reader(s) you have to instance your dequeuer class and then call the method Start.
Points of interest
The synchronization is done using the basic lock mechanism. Asynchronous access by the reader is realized by a manual event. When a new element is added in the queue, the event is set so every reader that is waiting for that event is released. After that, every reader tries to read. If an element is available, it’s processed. When reading elements queue size go down to 0, the event is reset.
This approach is simple and independent of the reader number. Also, it’s thought for cases in which write operation are very frequent reducing the synchronization mechanism to a minimum. It means that it’s not the best performance. Also, it works with framework 3.5 when framework 4.0 and 4.5 have introduced new mechanisms for thread synchronization.
public void Enqueue(V value)
{
lock (list)
{
list.Enqueue(value);
if (list.Count == 1)
{
newElementEvent.Set();
}
}
}
public bool Contains(V value)
{
lock (list)
{
return list.Contains(value);
}
}
public object Unqueue(int millisecondsTimeout)
{
bool flag = this.newElementEvent.WaitOne(millisecondsTimeout);
if (!flag)
{
return null;
}
lock (list)
{
if (list.Count == 0)
{
return null;
}
V ret = list.Dequeue();
if (list.Count == 0)
{
this.newElementEvent.Reset();
}
return ret;
}
}
public object Unqueue()
{
return this.Unqueue(-1);
}
So, the reader is always waiting for a new incoming element and when it gets the element, the OnNewElement
method is called with this one.
protected virtual void Run()
{
while (!shutdown)
{
try
{
object newElement = this.sourceQueue.Unqueue();
if (newElement != null)
{
OnNewElement(newElement);
}
}
catch (ThreadInterruptedException thInt)
{
}
catch (ThreadAbortException thAbort)
{
shutdown = true;
}
catch (Exception ex)
{
Console.Out.WriteLine(ex);
}
}
Console.Out.WriteLine("Exit from Run. Thread is stopped.");
}
The attached solution contains the library source code and a simple test console application.