Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#4.0

Extended Thread Pool

4.98/5 (25 votes)
6 Apr 2013Ms-PL3 min read 1   1.8K  
Your own extensible and configurable Thread Pool.

Image 1  

Introduction

Different kinds of Thread Pools are available: Smart Thread Pool, BlackHen.Threading, but nothing supports extensions. All Thread Pools are hard coded, and it's too difficult to work with any. I've tried to address these issues:

  • A Thread Pool should be extensible and configurable
  • A Thread Pool should be as simple as possible

So, I've created a Tiny Thread Pool. This Thread Pool is written using C# 4.0.

Tiny Thread Pool Features

  • Really simple 
  • Extensible queues
  • Extensible Task Items 
  • Limit on the maximum number of working threads
  • Dynamic thread workers
  • Task priority support 
  • Extensible logging

Tiny Thread Pool Design

  • ITaskItem represents a task
  • ITaskQueue represents the task queue logic 
  • ITaskQueueController represents the communication logic between the consumer and the producer (thread safe)
  • WorkThread represents a thread worker
  • TinyThreadPool controls work threads

Let's take a look at each class more deeply:

  • The ITaskItem represents some work that should be done.
C#
public interface ITaskItem
{
    void DoWork();
}
  • TaskItemPriority - WorkThread priority can be specified for each task.
  • The ITaskQueue is another simple interface that manages the task queue.
C#
/// <summary>
/// Represent the queue.
/// </summary>
public interface ITaskQueue
{
    /// <summary>
    /// Count of work item.
    /// </summary>
    int Count { get; }

    /// <summary>
    /// Dequeue the work item.
    /// </summary>
    /// <returns>The work item.</returns>
    IWorkItem Dequeue();

    /// <summary>
    /// Enqueue the work item.
    /// </summary>
    /// <param name="item">The work item.</param>
    void Enqueue(IWorkItem item);
}
  • ITaskQueueController provides communication logic between the consumer and the producer.
C#
public interface ITaskQueueController : IDisposable
{
    int ConsumersWaiting { get; }
    IWorkItem Dequeue();
    void Enqueue(IWorkItem item);
}

I've implemented two task queue controllers derived from ITaskQueueController:

  • DefaultTaskQueueController
  • BoundedTaskQueueController

Default Task Queue Controller

DefaultTaskQueueController is a thread-safe wrapper for the ITaskQueue:

C#
public sealed class DefaultTaskQueueController : TaskQueueController
{
    public DefaultTaskQueueController(ITaskQueue taskQueue)
        : base(taskQueue)
    {
    }

    protected override IWorkItem DequeueCore()
    {
        lock (_locker)
        {
            while (_taskQueue.Count == 0 && !_isDisposed)
            {
                _consumersWaiting++;
                Monitor.Wait(_locker);
                _consumersWaiting--;
            }
            if (_isDisposed)
            {
                return null;
            }
            return _taskQueue.Dequeue();
        }
    }

    protected override void EnqueueCore(IWorkItem item)
    {
        lock (_locker)
        {
            _taskQueue.Enqueue(item);
            if (_consumersWaiting > 0)
            {
                Monitor.PulseAll(_locker);
            }
        }
    }
}

Bounded Task Queue Controller

BoundedTaskQueueController (thread safe) - if the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage. The BoundedTaskQueueController allows you to place a limit on the size that the queue may reach before the producer is forced to block.

C#
public sealed class BoundedTaskQueueController : TaskQueueController
{
    private readonly int _maxTasksCount;
    private int _producersWaiting;

    public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
        : base(taskQueue)
    {
        if (maxTasksCount < 1)
        {
            throw new ArgumentException("MaxTasksCount should be greater 0");
        }
        _maxTasksCount = maxTasksCount;
    }

    protected override IWorkItem DequeueCore()
    {
        IWorkItem taskItem;
        lock (_locker)
        {
            while (_taskQueue.Count == 0 && !_isDisposed)
            {
                _consumersWaiting++;
                Monitor.Wait(_locker);
                _consumersWaiting--;
            }
            if (_isDisposed)
            {
                return null;
            }
            taskItem = _taskQueue.Dequeue();
            if (_producersWaiting > 0)
            {
                Monitor.PulseAll(_locker);
            }
        }
        return taskItem;
    }

    protected override void EnqueueCore(IWorkItem item)
    {
        lock (_locker)
        {
            while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDisposed)
            {
                _producersWaiting++;
                Monitor.Wait(_locker);
                _producersWaiting--;
            }
            _taskQueue.Enqueue(item);
            if (_consumersWaiting > 0)
            {
                Monitor.PulseAll(_locker);
            }
        }
    }
}

Tiny Thread Pool

The TinyThreadPool manages the ITaskQueueController. The  TinyThreadPool is created thru Create method, for example 

C#
var threadPool = TinyThreadPool.Create(x =>
{
    x.Name = "My ThreadPool";
    x.MinThreads = 2;
    x.MaxThreads = 10;
    x.MultiThreadingCapacity = MultiThreadingCapacity.Global;
});

All  TinyThreadPool's properties have a default value, so the simplest way to create TinyThreadPool  just call Default property. 

C#
var threadPool = TinyThreadPool.Default;

MultiThreadingCapacityType represent a threading capacity

C#
public enum MultiThreadingCapacityType
{
    /// <summary>
    /// Represent all processors
    /// </summary>
    Global,
    /// <summary>
    /// Represent one processor
    /// </summary>
    PerProcessor
}

The AddTask method is used to add a task into the task pipeline. If the maximum thread limit is not reached and ConsamersWaiting = 0, a new WorkThread will be created.  

You can add a task with a TaskItemPriority. Please note, DefaultTaskQueue does not validate TaskItemPriority use PriorityTaskQueue for priority tasks

C#
/// <summary>
/// Add new task.
/// </summary>
/// <param name="taskItem">Represent task.</param>
/// <param name="priority">Task priority.</param>
/// <remarks>Default task priority is <see cref="TaskItemPriority.Normal" />.</remarks>
public void AddTask(ITaskItem taskItem, TaskItemPriority priority = TaskItemPriority.Normal)
{
    IWorkItem workItem = WorkItem.FromTaskItem(taskItem, priority);
    AddWorkItem(workItem);
}

/// <summary>
/// Add new task as <see cref="Action" />.
/// </summary>
/// <param name="action">Represent task.</param>
/// <param name="priority">Task priority.</param>
/// <remarks>Default task priority is <see cref="TaskItemPriority.Normal" />.</remarks>
public void AddTask(Action action, TaskItemPriority priority = TaskItemPriority.Normal)
{
    IWorkItem workItem = WorkItem.FromAction(action, priority);
    AddWorkItem(workItem);
}

Work Thread

The WorkThread class executes a task item and provides logging.

C#
private void DoWork()
{
    while (_isRun)
    {
        try
        {
            IWorkItem workItem = _taskQueueController.Dequeue();
            if (workItem == null)
            {
                continue;
            }
            ProcessItem(workItem);
        }
        catch (Exception ex)
        {
            _log.Error(ex);
        }
    }
}

Thread Pool Extensibility 

In case you need a more powerful task queue, you need to implement the ITaskQueue, and don't worry about thread safety: you can create your own ITaskQueueController as well.  

Example

We create TinyThreadPool with custom settings. The SampleTask is derived by ITaskItem, please see  details below. 

C#
internal class Program
{
    private static ITinyThreadPool _threadPool;

    private static void AddTasks()
    {
        for (int taskIndex = 0; taskIndex < 50; taskIndex++)
        {
            _threadPool.AddTask(new SampleTask(taskIndex));
        }
    }

    private static void Main()
    {
        // create default TinyThreadPool instance or thru method TinyThreadPool.Create
        // _threadPool = TinyThreadPool.Default;

        _threadPool = TinyThreadPool.Create(x =>
        {
            x.Name = "My ThreadPool";
            x.MinThreads = 2;
            x.MaxThreads = 10;
            x.MultiThreadingCapacity = MultiThreadingCapacity.Global;
        });

        AddTasks();
        Console.ReadKey();
    }

    private sealed class SampleTask : ITaskItem
    {
        private readonly int _taskIndex;

        public SampleTask(int taskIndex)
        {
            _taskIndex = taskIndex;
        }

        public void DoWork()
        {
            Thread.Sleep(100);
            Console.WriteLine("Task {0} has been finished", _taskIndex);
        }
    }
}

History

  • 29 Jun 2008: Initial version.
  • 02 Jul 2008
    • + TransactionalMsmqTaskItem, DefaultTaskQueue.
    • * TaskQueueController classes added disposable support.
    • * WorkThread updated stop logic.
    • * Sample projects: 
      • CoreDefaultMsmqSample uses Core.Threading.ThreadPools.TaskQueues.DefaultTaskQueue (see the App.config file for more details).
      • CoreDefaultSample uses CoreDefaultSample.TaskQueue (see the App.config file for more details).
  • 10 Jul 2008
    • Added Mike.Strobel's suggestion.
    • + ActionTaskItem.
    • * ExtendedThreadPool added AddTask(Action action) and AddTask(Action action, ThreadPriority priority) methods.
    • Added more tests
  • 02 Oct 2009
    • + PriorityTaskQueue.
    • + StatisticController.
    • * ExtendedThreadPool added support StatisticController, MultiThreadingCapacityType.
    • Added support Unity 1.2
    • Added CorePrioritySample. The project uses Core.Threading.ThreadPools.TaskQueues.PriorityTaskQueue (see the App.config file for more details).
    • Added more tests 
  • 07 Apr 2013 
    •  ExtendedThreadPool  v2
    •  Unity has been removed
    •  ExtendedThreadPool creation has been simplified 
  • 22 Apr 2015
    • ExtendedThreadPool -> TinyThreadPool
    • TinyThreadPool the new  creating API based on lambda
    • TinyThreadPool is a part of the Nelibur.Sword NuGet package

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)