Introduction
My current project is to build a real time distributed quotation engine. Online customers would enter their details and submit a request to the quote engine. The engine creates a list of quote providers to communicate B2B with various backend systems over the web. Each provider has a custom implementation with provider specific mapping and transport requirements.
A requirement of the quote engine was that the process should not take more than 15 seconds, as web based customers would just leave if left hanging too long.
The time to retrieve a quote from each provider was estimated at 3-7 seconds, so sequential processing was out of the question. We were also aware that if the system waited for all quotes to return before returning results to the consumer, then a failure with just one provider would affect the overall performance, e.g., if any target provider system was down or just running slowly.
Solving the Problem
At the heart of the code is a standard ThreadPool.QueueUserWorkItem
to run my providers, and a WaitHandle.WaitAll(waitHandles, timeOut)
which waits for all providers to finish or exits after a set period of time.
The main addition that I have added is a task wrapper ThreadTaskInfo
that keeps track of the state of the running provider, including any exceptions, and a static manager class ThreadUtil
to run an action delegate on any list of classes in parallel and to manage the state of each action via the associated ThreadTaskInfo
.
The ThreadUtil
class is designed to work on any class that you wish to run in parallel. There is no need to extend from a base class or implement an interface. All you need is a list of classes that have a common Action delegate (Command Pattern) and provide that list of classes and their action delegate to the ThreadUtil.RunAsynchronously<T>
method.
The solution provided in this article is similar to the .NET 4 TPL Task Parallel Library. When I wrote the code, I was not aware of the Parallel Extensions library, and have since found that the links to the parallel extensions for .NET 3.5 all seem to be broken, and currently there are no plans for us to move to .NET 4.
Main Code
ThreadTaskInfo
wraps your execution classes and represents a single task that stores the current execution state of the thread. It will also store any exceptions thrown by your classes, or timeouts if your code does not complete in a timely manner.
public class ThreadTaskInfo<t>
{
private readonly object _synchLock = new object();
public ThreadTaskInfo(T task)
{
Task = task;
State = ThreadTaskStateType.Created;
TaskComplete = new ManualResetEvent(false);
}
public T Task { get; private set; }
public ManualResetEvent TaskComplete { get; private set; }
public Exception Exception { get; set; }
public ThreadTaskStateType State { get; private set; }
public void SetState(ThreadTaskStateType state)
{
lock (_synchLock)
{
switch (state)
{
case ThreadTaskStateType.Created:
throw new Exception("State cannot be set to created");
case ThreadTaskStateType.Started:
if (State != ThreadTaskStateType.Created)
{
return;
}
State = state;
return;
case ThreadTaskStateType.Completed:
if (State != ThreadTaskStateType.Started)
{
return;
}
State = state;
return;
case ThreadTaskStateType.Failed:
if (State == ThreadTaskStateType.Started || State ==
ThreadTaskStateType.Completed)
{
State = state;
}
return;
case ThreadTaskStateType.FailedTimeout:
if (State == ThreadTaskStateType.Started || State ==
ThreadTaskStateType.Completed)
{
State = state;
}
return;
}
}
}
}
The ThreadTaskStateType
enumeration represents the state of each running task.
public enum ThreadTaskStateType
{
Created,
Started,
Completed,
Failed,
FailedTimeout,
}
ThreadUtil
provides a single static method RunAsynchronously<T>
, which will run your action delegate against the list of tasks in parallel. This method will handle state management, exceptions, and timeouts, and record the information against each ThreadTaskInfo
object.
When all the tasks are complete or if a timeout is reached, a list of ThreadTaskInfo
will be returned that you can interrogate and see which tasks ran successfully and within the allotted time.
public static class ThreadUtil
{
public static List<ThreadTaskInfo<t>>
RunAsynchronously<t>(IList<t> tasks,
Action<t> action, int timeOut)
{
var result = new List<ThreadTaskInfo<t>>();
if (tasks == null || tasks.Count == 0 || action == null)
{
return result;
}
foreach (var taskInfo in tasks.Select(task => new ThreadTaskInfo<t>(task)))
{
result.Add(taskInfo);
var info = taskInfo;
ThreadPool.QueueUserWorkItem(state =>
{
try
{
info.SetState(ThreadTaskStateType.Started);
action.Invoke(info.Task);
info.SetState(ThreadTaskStateType.Completed);
info.TaskComplete.Set();
}
catch (Exception ex)
{
info.SetState(ThreadTaskStateType.Failed);
info.Exception = ex;
info.TaskComplete.Set();
}
});
}
var waitHandles = result.Select(taskInfo => taskInfo.TaskComplete).ToArray();
WaitHandle.WaitAll(waitHandles, timeOut);
foreach (var taskInfo in result.Where(taskInfo =>
taskInfo.State == ThreadTaskStateType.Started))
{
taskInfo.SetState(ThreadTaskStateType.FailedTimeout);
}
return result;
}
}
Testing the Code
The test classes demonstrate various examples and puts the manager through different scenarios.
BaseTestTask
- The ThreadUtil.RunAsynchronously<T>(List<T> tasks, Action<T> action, int timeOut)
method requires that all tasks be of type T
, so for the example, each of the different tasks will derive from BaseTestTask
and implement the Execute
method.PrintTask
- Prints some text to the console.GoogleTask
- Runs a search against Google.LongRunTask
- Pauses for a period of time using Thread.Sleep
.ExceptionTask
- Throws an exception.
The unit tests demonstrate the following scenarios:
- Success - Run ten separate tasks in parallel successfully.
- Timeout - Run three separate tasks but one of the tasks will take too long.
- Exception - Run three separate tasks but one of them will throw an exception.
Sample Tasks
public abstract class BaseTestTask
{
protected BaseTestTask(string taskName)
{
TaskName = taskName;
}
public string TaskName { get; set; }
public abstract void Execute();
}
public class PrintTask : BaseTestTask
{
public PrintTask(string taskName) : base(taskName) { }
public override void Execute()
{
_.P("Print Task: {0}", TaskName);
}
}
public class GoogleTask : BaseTestTask
{
public GoogleTask(string taskName, string query) : base(taskName)
{
Query = query;
}
public string Query { get; set; }
public override void Execute()
{
var query =
"http://www.google.com/search?q={0}".FormatWith(
HttpUtility.UrlEncode(Query));
_.P("Google Task: {0} : {1}", TaskName, query);
Content = (new WebClient()).DownloadString(query);
}
public string Content { get; set; }
}
public class LongRunTask : BaseTestTask
{
public LongRunTask(string taskName, int pauseFor) : base(taskName)
{
_.P("Long Task: {0}", TaskName);
PauseFor = pauseFor;
}
public int PauseFor { get; set; }
public override void Execute()
{
_.P("Long Task: {0} - Timeout: {1}", TaskName, PauseFor);
Thread.Sleep(PauseFor);
}
}
public class ExceptionTask : BaseTestTask
{
public ExceptionTask(string taskName,
string exceptionMessage) : base(taskName)
{
ExceptionMessage = exceptionMessage;
}
public string ExceptionMessage { get; set; }
public override void Execute()
{
_.P("Exception Task: {0}", TaskName);
throw new Exception(ExceptionMessage);
}
}
Unit Tests
[Test]
public void RunParallelTasksSuccess()
{
var tasks = new List<BaseTestTask>
{
new PrintTask("David"),
new PrintTask("Cruwys"),
new GoogleTask("Query 1", "www.davidcruwys.com"),
new GoogleTask("Query 2", "www.infochoice.com"),
new GoogleTask("Query 3", "www.my-trading-journal.com"),
new GoogleTask("Query 4", "www.tradingtothemax.com"),
new GoogleTask("Query 5", "Cheap travel"),
new GoogleTask("Query 6", "Home loans"),
new GoogleTask("Query 7", "Credit cards"),
new LongRunTask("But within timeout", 8000),
};
var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 10000);
_.L("Print Info");
taskInfos.ForEach(Print);
Assert.That(taskInfos.Count, Is.EqualTo(10));
Assert.That(taskInfos[0].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 1");
Assert.That(taskInfos[1].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 2");
Assert.That(taskInfos[2].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 3");
Assert.That(taskInfos[3].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 4");
Assert.That(taskInfos[4].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 5");
Assert.That(taskInfos[5].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 6");
Assert.That(taskInfos[6].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 7");
Assert.That(taskInfos[7].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 8");
Assert.That(taskInfos[8].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 9");
Assert.That(taskInfos[9].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 10");
}
[Test]
public void RunParallelTasksTimeOut()
{
var tasks = new List<BaseTestTask>
{
new LongRunTask("Too Long", 3500),
new PrintTask("David"),
new PrintTask("Cruwys"),
};
_.L("Print Info");
var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 3000);
taskInfos.ForEach(Print);
Assert.That(taskInfos.Count, Is.EqualTo(3));
Assert.That(taskInfos[0].State, Is.EqualTo(ThreadTaskStateType.FailedTimeout), "Task 1");
Assert.That(taskInfos[1].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 2");
Assert.That(taskInfos[2].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 3");
}
[Test]
public void RunParallelTasksException()
{
var tasks = new List<BaseTestTask>
{
new PrintTask("David"),
new ExceptionTask("Bad Task", "Failure in task"),
new PrintTask("Cruwys"),
};
_.L("Print Info");
var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 1000);
taskInfos.ForEach(Print);
Assert.That(taskInfos.Count, Is.EqualTo(3));
Assert.That(taskInfos[0].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 1");
Assert.That(taskInfos[1].State, Is.EqualTo(ThreadTaskStateType.Failed), "Task 2");
Assert.That(taskInfos[1].Exception, Is.Not.Null, "Task 2");
Assert.That(taskInfos[1].Exception.Message, Is.EqualTo("Failure in task"), "Task 2");
Assert.That(taskInfos[2].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 3");
}
Business Test Case
The following code is not included in the download. It is just an extract from our quote engine that tests running 15 separate customer requests with varying request data structures against the quote engine, which in turn calls through to 6 quote providers to fulfill the requests.
It is just included here as another example of running tasks in parallel.
[Test]
[QuoteRequestCleanup(CleanupAction)]
public void BuildProductQuotes99_SendMultipleMessages()
{
var tasks = new List<messagerunner>
{
new MessageRunner("01_Success.xml"),
new MessageRunner("02_Success_CarModified.xml"),
new MessageRunner("03_Success_CarDamaged.xml"),
new MessageRunner("10_RemoteServer_TimeoutA.xml"),
new MessageRunner("10_RemoteServer_TimeoutB.xml"),
new MessageRunner("11_RemoteServer_NullMessage.xml"),
new MessageRunner("12_RemoteServer_InvalidResultStructure.xml"),
new MessageRunner("13_RemoteServer_UnknownProducts.xml"),
new MessageRunner("14_RemoteServer_ZeroProducts.xml"),
new MessageRunner("15_RemoteServer_ProductKnockout.xml"),
new MessageRunner("20_QuoteProvider_BuildMessageException.xml"),
new MessageRunner("21_QuoteProvider_SendMessageException.xml"),
new MessageRunner("22_QuoteProvider_SendMessageTimeoutException.xml"),
new MessageRunner("23_QuoteProvider_SendMessageAuthorizationException.xml"),
new MessageRunner("24_QuoteProvider_ProcessMessageException.xml")
};
var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 20000);
_.L("Print Info");
taskInfos.ForEach(t => t.Task.Print());
}
public class MessageRunner
{
public MessageRunner(string dataFileName)
{
DataFileName = dataFileName;
QuoteRequestController = Ioc.GetInstance<iquoterequestcontroller>();
SecurityController = Ioc.GetInstance<isecuritycontroller>();
SampleUser = SecurityController.AuthenticateUser(
"TestPartnerName", "TestPassword");
}
private IQuoteRequestController QuoteRequestController { get; set; }
private ISecurityController SecurityController { get; set; }
private IUser SampleUser { get; set; }
private string DataFileName { get; set; }
public CarInsuranceQuoteRequestResultDto Result { get; set; }
public void Execute()
{
var quoteRequest =
T.TestData.QuoteRequestMessage.GetQuoteRequest(DataFileName);
Result = QuoteRequestController.QuoteRequest(SampleUser, quoteRequest);
}
public void Print()
{
_.SerializeToString(Result);
}
}
History
- May 5, 2010 - Original posted.
- May 6, 2010 - Updates based on comments.