Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

A Thread Pool for the Producers/Consumers Process

0.00/5 (No votes)
27 Jan 2013CPOL2 min read 11.9K   198  
This tip is connected to my previous one. I show how to use .NET 4.0 BlockingCollection in a simple way.

Introduction

In the previous article, I’ve proposed a simple way to create a multi-thread queue for a producers-consumes process. As I said, the library was developed for .NET 3.5.

.NET 4.0 has a new set of classes that realized this kind of function, in details the BlockingCollection realized a queue.

Background

.NET 4.0 provides a new spacename System.Collections.Concurrent that contains the new classes for multi-thread collections. The use of these classes is not complicated, but I’ve written a simple library – like the previous one – that makes the job simpler. Particularly, this library provides a simple way for creating a pool of dequeuer threads.

Using the Code

The library can be used inside every project. This first draft is composed of 4 classes and one interface as described in the following table:

IMultiThreadBlockingQueue It’s the interface for a multi-thread collection. It’s necessary to have a non-type interface.
MultiThreadBlockingQueue It’s the generic class that implements the above interface.
MultiThreadDequeuer It’s the abstract class for a dequeuer thread. The OnNewElement method should be implemented in inherited classes.
MultiThreadDequeuerParams It’s the basic class with the params for instantiating the dequeuer.
MultiThreadDequeuerPool It’s the dequeuer pool.

To create the queue, you have to instance a type class of the MultiThreadBlockingQueue generic class.

C#
// Instance a blocking collection queue for double values.
MultiThreadBlockingQueueLib.MultiThreadBlockingQueue<double> queue =
    new MultiThreadBlockingQueueLib.MultiThreadBlockingQueue<double>(); 

Reading the values inserted in the queue is very simple. You have to create a new class inherited from MultiThreadDequeuer and define the method OnNewElement for this new class. This method is called every time a new element is available on the queue.

C#
class ThreadDequeuer : MultiThreadBlockingQueueLib.MultiThreadDequeuer
{
    …
    public ThreadDequeuer(MultiThreadBlockingQueueLib.MultiThreadDequeuerParams p) 
        : base(p)
    {
    }
    protected override void OnNewElement(object element)
    {
        …
    }
}

To start the reader(s), you have to instance your dequeuer class(es) and then call the method Start.

You can also quickly create a pool of dequeuers. You just have to instance a MultiThreadDequeuerPool class and then initialize the pool by the Init method to define the number of threads and the class to use as a dequeuer. Then start and stop all the threads by the pool methods Start and Shutdown. Also, you can dynamically add a new thread by the method AddDequeuer.

C#
MultiThreadBlockingQueueLib.MultiThreadDequeuerPool dequeuerPool = 
    new MultiThreadBlockingQueueLib.MultiThreadDequeuerPool();
dequeuerPool.Init(1, typeof(ThreadDequeuer), 
    new MultiThreadBlockingQueueLib.MultiThreadDequeuerParams(queue));

Points of Interest

BlockingCollection provides the method Take to wait for a new item in the collection and take it. If you want to stop this method, you have to use a System.Threading.CancellationToken structure.

Therefore, the dequeuer has to instance a System.Threading.CancellationTokenSource class in order to pass a CancellationToken to the Take method. When you want to stop, call the Cancel method of the CancellationTokenSource. See the MultiThreadDequeuer to check how it works.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)