Introduction
Different kinds of Thread Pools are available: Smart Thread Pool, BlackHen.Threading, but nothing supports extensions. All Thread Pools are hard coded, and it's too difficult to work with any. I've tried to address these issues:
- A Thread Pool should be extensible and configurable
- A Thread Pool should be as simple as possible
So, I've created a Tiny Thread Pool. This Thread Pool is written using C# 4.0.
Tiny Thread Pool Features
- Really simple
- Extensible queues
- Extensible Task Items
- Limit on the maximum number of working threads
- Dynamic thread workers
- Task priority support
- Extensible logging
Tiny Thread Pool Design
ITaskItem
represents a task ITaskQueue
represents the task queue logic ITaskQueueController
represents the communication logic between the consumer and the producer (thread safe) WorkThread
represents a thread worker TinyThreadPool
controls work threads
Let's take a look at each class more deeply:
- The
ITaskItem
represents some work that should be done.
public interface ITaskItem
{
void DoWork();
}
TaskItemPriority
- WorkThread
priority can be specified for each task. - The
ITaskQueue
is another simple interface that manages the task queue.
public interface ITaskQueue
{
int Count { get; }
IWorkItem Dequeue();
void Enqueue(IWorkItem item);
}
ITaskQueueController
provides communication logic between the consumer and the producer.
public interface ITaskQueueController : IDisposable
{
int ConsumersWaiting { get; }
IWorkItem Dequeue();
void Enqueue(IWorkItem item);
}
I've implemented two task queue controllers derived from ITaskQueueController
:
DefaultTaskQueueController
BoundedTaskQueueController
Default Task Queue Controller
DefaultTaskQueueController
is a thread-safe wrapper for the ITaskQueue
:
public sealed class DefaultTaskQueueController : TaskQueueController
{
public DefaultTaskQueueController(ITaskQueue taskQueue)
: base(taskQueue)
{
}
protected override IWorkItem DequeueCore()
{
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDisposed)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDisposed)
{
return null;
}
return _taskQueue.Dequeue();
}
}
protected override void EnqueueCore(IWorkItem item)
{
lock (_locker)
{
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
{
Monitor.PulseAll(_locker);
}
}
}
}
Bounded Task Queue Controller
BoundedTaskQueueController
(thread safe) - if the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage. The BoundedTaskQueueController
allows you to place a limit on the size that the queue may reach before the producer is forced to block.
public sealed class BoundedTaskQueueController : TaskQueueController
{
private readonly int _maxTasksCount;
private int _producersWaiting;
public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
: base(taskQueue)
{
if (maxTasksCount < 1)
{
throw new ArgumentException("MaxTasksCount should be greater 0");
}
_maxTasksCount = maxTasksCount;
}
protected override IWorkItem DequeueCore()
{
IWorkItem taskItem;
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDisposed)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDisposed)
{
return null;
}
taskItem = _taskQueue.Dequeue();
if (_producersWaiting > 0)
{
Monitor.PulseAll(_locker);
}
}
return taskItem;
}
protected override void EnqueueCore(IWorkItem item)
{
lock (_locker)
{
while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDisposed)
{
_producersWaiting++;
Monitor.Wait(_locker);
_producersWaiting--;
}
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
{
Monitor.PulseAll(_locker);
}
}
}
}
Tiny Thread Pool
The TinyThreadPool
manages the ITaskQueueController
. The TinyThreadPool
is created thru Create
method, for example
var threadPool = TinyThreadPool.Create(x =>
{
x.Name = "My ThreadPool";
x.MinThreads = 2;
x.MaxThreads = 10;
x.MultiThreadingCapacity = MultiThreadingCapacity.Global;
});
All TinyThreadPool's
properties have a default value, so the simplest way to create TinyThreadPool
just call Default
property.
var threadPool = TinyThreadPool.Default;
MultiThreadingCapacityType
represent a threading capacity
public enum MultiThreadingCapacityType
{
Global,
PerProcessor
}
The AddTask
method is used to add a task into the task pipeline. If the maximum thread limit is not reached and ConsamersWaiting = 0
, a new WorkThread
will be created.
You can add a task with a TaskItemPriority
. Please note, DefaultTaskQueue
does not validate TaskItemPriority
use PriorityTaskQueue
for priority tasks
public void AddTask(ITaskItem taskItem, TaskItemPriority priority = TaskItemPriority.Normal)
{
IWorkItem workItem = WorkItem.FromTaskItem(taskItem, priority);
AddWorkItem(workItem);
}
public void AddTask(Action action, TaskItemPriority priority = TaskItemPriority.Normal)
{
IWorkItem workItem = WorkItem.FromAction(action, priority);
AddWorkItem(workItem);
}
Work Thread
The WorkThread
class executes a task item and provides logging.
private void DoWork()
{
while (_isRun)
{
try
{
IWorkItem workItem = _taskQueueController.Dequeue();
if (workItem == null)
{
continue;
}
ProcessItem(workItem);
}
catch (Exception ex)
{
_log.Error(ex);
}
}
}
Thread Pool Extensibility
In case you need a more powerful task queue, you need to implement the ITaskQueue
, and don't worry about thread safety: you can create your own ITaskQueueController
as well.
Example
We create TinyThreadPool
with custom settings. The SampleTask
is derived by ITaskItem
, please see details below.
internal class Program
{
private static ITinyThreadPool _threadPool;
private static void AddTasks()
{
for (int taskIndex = 0; taskIndex < 50; taskIndex++)
{
_threadPool.AddTask(new SampleTask(taskIndex));
}
}
private static void Main()
{
_threadPool = TinyThreadPool.Create(x =>
{
x.Name = "My ThreadPool";
x.MinThreads = 2;
x.MaxThreads = 10;
x.MultiThreadingCapacity = MultiThreadingCapacity.Global;
});
AddTasks();
Console.ReadKey();
}
private sealed class SampleTask : ITaskItem
{
private readonly int _taskIndex;
public SampleTask(int taskIndex)
{
_taskIndex = taskIndex;
}
public void DoWork()
{
Thread.Sleep(100);
Console.WriteLine("Task {0} has been finished", _taskIndex);
}
}
}
History
- 29 Jun 2008: Initial version.
- 02 Jul 2008
- +
TransactionalMsmqTaskItem
, DefaultTaskQueue
. - *
TaskQueueController
classes added disposable support. - *
WorkThread
updated stop logic. - * Sample projects:
- CoreDefaultMsmqSample uses
Core.Threading.ThreadPools.TaskQueues.DefaultTaskQueue
(see the App.config file for more details). - CoreDefaultSample uses
CoreDefaultSample.TaskQueue
(see the App.config file for more details).
- 10 Jul 2008
- Added Mike.Strobel's suggestion.
- +
ActionTaskItem
. - *
ExtendedThreadPool
added AddTask(Action action)
and AddTask(Action action, ThreadPriority priority)
methods. - Added more tests
- 02 Oct 2009
- +
PriorityTaskQueue
. - +
StatisticController
. - *
ExtendedThreadPool
added support StatisticController
, MultiThreadingCapacityType
. - Added support Unity 1.2
- Added CorePrioritySample. The project uses
Core.Threading.ThreadPools.TaskQueues.PriorityTaskQueue
(see the App.config file for more details). - Added more tests
- 07 Apr 2013
-
ExtendedThreadPool
v2 - Unity has been removed
-
ExtendedThreadPool
creation has been simplified
- 22 Apr 2015
ExtendedThreadPool
-> TinyThreadPool
TinyThreadPool
the new
creating API based on lambda TinyThreadPool
is a part of the Nelibur
.Sword
NuGet package