Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Parallel Tasks in .NET 3.0

4.89/5 (9 votes)
6 May 2010CPOL4 min read 34.3K   469  
Provide a mechanism to execute a list of tasks in parallel on multiple threads and communicate back to the calling thread useful states such as exceptions, timeouts, and successful task completion.

Introduction

My current project is to build a real time distributed quotation engine. Online customers would enter their details and submit a request to the quote engine. The engine creates a list of quote providers to communicate B2B with various backend systems over the web. Each provider has a custom implementation with provider specific mapping and transport requirements.

A requirement of the quote engine was that the process should not take more than 15 seconds, as web based customers would just leave if left hanging too long.

The time to retrieve a quote from each provider was estimated at 3-7 seconds, so sequential processing was out of the question. We were also aware that if the system waited for all quotes to return before returning results to the consumer, then a failure with just one provider would affect the overall performance, e.g., if any target provider system was down or just running slowly.

Solving the Problem

At the heart of the code is a standard ThreadPool.QueueUserWorkItem to run my providers, and a WaitHandle.WaitAll(waitHandles, timeOut) which waits for all providers to finish or exits after a set period of time.

The main addition that I have added is a task wrapper ThreadTaskInfo that keeps track of the state of the running provider, including any exceptions, and a static manager class ThreadUtil to run an action delegate on any list of classes in parallel and to manage the state of each action via the associated ThreadTaskInfo.

The ThreadUtil class is designed to work on any class that you wish to run in parallel. There is no need to extend from a base class or implement an interface. All you need is a list of classes that have a common Action delegate (Command Pattern) and provide that list of classes and their action delegate to the ThreadUtil.RunAsynchronously<T> method.

The solution provided in this article is similar to the .NET 4 TPL Task Parallel Library. When I wrote the code, I was not aware of the Parallel Extensions library, and have since found that the links to the parallel extensions for .NET 3.5 all seem to be broken, and currently there are no plans for us to move to .NET 4.

Main Code

ThreadTaskInfo wraps your execution classes and represents a single task that stores the current execution state of the thread. It will also store any exceptions thrown by your classes, or timeouts if your code does not complete in a timely manner.

C#
public class ThreadTaskInfo<t>
{
    private readonly object _synchLock = new object();

    /// <summary>
    /// Initializes a new instance of the ThreadTaskInfo
    // see cref="ThreadTaskInfo<T>"/> class.
    /// </summary>
    /// <param name="task">The task.</param>
    public ThreadTaskInfo(T task)
    {
        Task = task;

        // Task is created, but has not started running yet
        State = ThreadTaskStateType.Created;

        // Task is not set to completed
        TaskComplete = new ManualResetEvent(false);
    }

    /// <summary>
    /// Gets or sets the task that is running on the thread
    /// </summary>
    /// <value>The task.</value>
    public T Task { get; private set; }

    /// <summary>
    /// Gets or sets the task complete signal (Wait Handle).
    /// </summary>
    /// <value>The task complete.</value>
    public ManualResetEvent TaskComplete { get; private set; }

    /// <summary>
    /// Gets or sets an exception if it is thrown by the task.
    /// </summary>
    /// <value>The exception.</value>
    public Exception Exception { get; set; }

    /// <summary>
    /// Gets or sets the state of currently running threaded task.
    /// </summary>
    /// <value>The state.</value>
    public ThreadTaskStateType State { get; private set; }

    /// <summary>
    /// Set the state of the task.
    /// </summary>
    /// The state.
    public void SetState(ThreadTaskStateType state)
    {
        // Lock the writing of the State property
        lock (_synchLock)
        {
            switch (state)
            {
                case ThreadTaskStateType.Created:
                    throw new Exception("State cannot be set to created");

                case ThreadTaskStateType.Started:
                    if (State != ThreadTaskStateType.Created)
                    {
                        return;
                    }
                    State = state;
                    return;

                case ThreadTaskStateType.Completed:
                    if (State != ThreadTaskStateType.Started)
                    {
                        return;
                    }
                    State = state;
                    return;

                case ThreadTaskStateType.Failed:
                    if (State == ThreadTaskStateType.Started || State == 
                                 ThreadTaskStateType.Completed)
                    {
                        State = state;
                    }
                    return;

                case ThreadTaskStateType.FailedTimeout:
                    if (State == ThreadTaskStateType.Started || State == 
                                 ThreadTaskStateType.Completed)
                    {
                        State = state;
                    }
                    return;
            }
        }
    }
}

The ThreadTaskStateType enumeration represents the state of each running task.

C#
/// <summary>
/// State of the currently running threaded task
/// </summary>
public enum ThreadTaskStateType
{
    /// <summary>
    /// Created state, not yet put onto the thread pool
    /// </summary>
    Created,

    /// <summary>
    /// Thread is started, the task is ready to execute
    /// </summary>
    Started,

    /// <summary>
    /// Thread is complete, the task has finsihed executing
    /// </summary>
    Completed,

    /// <summary>
    /// Thread failed to complete
    /// </summary>
    Failed,

    /// <summary>
    /// Thread failed to complete in specified time
    /// </summary>
    FailedTimeout,
}

ThreadUtil provides a single static method RunAsynchronously<T>, which will run your action delegate against the list of tasks in parallel. This method will handle state management, exceptions, and timeouts, and record the information against each ThreadTaskInfo object.

When all the tasks are complete or if a timeout is reached, a list of ThreadTaskInfo will be returned that you can interrogate and see which tasks ran successfully and within the allotted time.

C#
public static class ThreadUtil
{
    /// <summary>
    /// Execute a list of tasks asynchronously.
    /// </summary>
    /// <typeparam name="T">
    /// <param name="tasks">The tasks to execute.</param>
    /// <param name="action">The action on the object to run.</param>
    /// <param name="timeOut">Timeout in millisecond.</param>
    /// <returns>List of TaskInfo storing information about the execution
    /// and a reference to the tasks themselves</returns>
    public static List<ThreadTaskInfo<t>> 
           RunAsynchronously<t>(IList<t> tasks, 
                             Action<t> action, int timeOut)
    {
        var result = new List<ThreadTaskInfo<t>>();

        if (tasks == null || tasks.Count == 0 || action == null)
        {
            return result;
        }

        // Create a list of ThreadTaskInfo wrappers for each task to run
        foreach (var taskInfo in tasks.Select(task => new ThreadTaskInfo<t>(task)))
        {
            result.Add(taskInfo);

            var info = taskInfo;

            // Wrap the execution method in a delegate with a thread complete
            // signal to inform the main thread that the worker thread is complete
            ThreadPool.QueueUserWorkItem(state =>
            {
                try
                {
                    info.SetState(ThreadTaskStateType.Started);

                    // Execute task
                    action.Invoke(info.Task);

                    info.SetState(ThreadTaskStateType.Completed);

                    // Signal the main thread, that this execution thread is complete
                    info.TaskComplete.Set();
                }
                catch (Exception ex)
                {
                    info.SetState(ThreadTaskStateType.Failed);
                    info.Exception = ex;
                    info.TaskComplete.Set();
                }
            });
        }

        // waitHandles is used to signal when a thread is complete
        // Grab all wait handles asigned to the task list
        var waitHandles = result.Select(taskInfo => taskInfo.TaskComplete).ToArray();

        WaitHandle.WaitAll(waitHandles, timeOut);

        // If any tasks are still running then mark them with Timeout Failure status
        foreach (var taskInfo in result.Where(taskInfo => 
                 taskInfo.State == ThreadTaskStateType.Started))
        {
            taskInfo.SetState(ThreadTaskStateType.FailedTimeout);
        }

        return result;
    }
}

Testing the Code

The test classes demonstrate various examples and puts the manager through different scenarios.

  • BaseTestTask - The ThreadUtil.RunAsynchronously<T>(List<T> tasks, Action<T> action, int timeOut) method requires that all tasks be of type T, so for the example, each of the different tasks will derive from BaseTestTask and implement the Execute method.
  • PrintTask - Prints some text to the console.
  • GoogleTask - Runs a search against Google.
  • LongRunTask - Pauses for a period of time using Thread.Sleep.
  • ExceptionTask - Throws an exception.

The unit tests demonstrate the following scenarios:

  • Success - Run ten separate tasks in parallel successfully.
  • Timeout - Run three separate tasks but one of the tasks will take too long.
  • Exception - Run three separate tasks but one of them will throw an exception.

Sample Tasks

C#
public abstract class BaseTestTask
{
    protected BaseTestTask(string taskName)
    {
        TaskName = taskName;
    }

    public string TaskName { get; set; }

    public abstract void Execute();
}

public class PrintTask : BaseTestTask
{
    public PrintTask(string taskName) : base(taskName) { }

    public override void Execute()
    {
        _.P("Print Task: {0}", TaskName);
    }
}

public class GoogleTask : BaseTestTask
{
    public GoogleTask(string taskName, string query) : base(taskName)
    {
        Query = query;
    }

    public string Query { get; set; }

    public override void Execute()
    {
        var query = 
            "http://www.google.com/search?q={0}".FormatWith(
            HttpUtility.UrlEncode(Query));

        _.P("Google Task: {0} : {1}", TaskName, query);

        Content = (new WebClient()).DownloadString(query);
    }

    public string Content { get; set; }
}

public class LongRunTask : BaseTestTask
{
    public LongRunTask(string taskName, int pauseFor) : base(taskName)
    {
        _.P("Long Task: {0}", TaskName);

        PauseFor = pauseFor;
    }

    public int PauseFor { get; set; }

    public override void Execute()
    {
        _.P("Long Task: {0} - Timeout: {1}", TaskName, PauseFor);

        Thread.Sleep(PauseFor);
    }
}

public class ExceptionTask : BaseTestTask
{
    public ExceptionTask(string taskName, 
           string exceptionMessage) : base(taskName)
    {
        ExceptionMessage = exceptionMessage;
    }

    public string ExceptionMessage { get; set; }

    public override void Execute()
    {
        _.P("Exception Task: {0}", TaskName);

        throw new Exception(ExceptionMessage);
    }
}

Unit Tests

C#
// *********************************************************************************
// Test Cases
// *********************************************************************************
[Test]
public void RunParallelTasksSuccess()
{
    var tasks = new List<BaseTestTask>
                   {
                       new PrintTask("David"),
                       new PrintTask("Cruwys"),
                       new GoogleTask("Query 1", "www.davidcruwys.com"),
                       new GoogleTask("Query 2", "www.infochoice.com"),
                       new GoogleTask("Query 3", "www.my-trading-journal.com"),
                       new GoogleTask("Query 4", "www.tradingtothemax.com"),
                       new GoogleTask("Query 5", "Cheap travel"),
                       new GoogleTask("Query 6", "Home loans"),
                       new GoogleTask("Query 7", "Credit cards"),
                       new LongRunTask("But within timeout", 8000),
                   };

    var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 10000);

    _.L("Print Info");
    taskInfos.ForEach(Print);

    Assert.That(taskInfos.Count, Is.EqualTo(10));

    Assert.That(taskInfos[0].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 1");
    Assert.That(taskInfos[1].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 2");
    Assert.That(taskInfos[2].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 3");
    Assert.That(taskInfos[3].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 4");
    Assert.That(taskInfos[4].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 5");
    Assert.That(taskInfos[5].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 6");
    Assert.That(taskInfos[6].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 7");
    Assert.That(taskInfos[7].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 8");
    Assert.That(taskInfos[8].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 9");
    Assert.That(taskInfos[9].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 10");
}

[Test]
public void RunParallelTasksTimeOut()
{
    var tasks = new List<BaseTestTask>
                   {
                       new LongRunTask("Too Long", 3500),
                       new PrintTask("David"),
                       new PrintTask("Cruwys"),
                   };

    _.L("Print Info");
    var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 3000);

    taskInfos.ForEach(Print);

    Assert.That(taskInfos.Count, Is.EqualTo(3));

    Assert.That(taskInfos[0].State, Is.EqualTo(ThreadTaskStateType.FailedTimeout), "Task 1");
    Assert.That(taskInfos[1].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 2");
    Assert.That(taskInfos[2].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 3");
}

[Test]
public void RunParallelTasksException()
{
    var tasks = new List<BaseTestTask>
                   {
                       new PrintTask("David"),
                       new ExceptionTask("Bad Task", "Failure in task"),
                       new PrintTask("Cruwys"),
                   };

    _.L("Print Info");
    var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 1000);

    taskInfos.ForEach(Print);

    Assert.That(taskInfos.Count, Is.EqualTo(3));

    Assert.That(taskInfos[0].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 1");
    Assert.That(taskInfos[1].State, Is.EqualTo(ThreadTaskStateType.Failed), "Task 2");
    Assert.That(taskInfos[1].Exception, Is.Not.Null, "Task 2");
    Assert.That(taskInfos[1].Exception.Message, Is.EqualTo("Failure in task"), "Task 2");
    Assert.That(taskInfos[2].State, Is.EqualTo(ThreadTaskStateType.Completed), "Task 3");
}

Business Test Case

The following code is not included in the download. It is just an extract from our quote engine that tests running 15 separate customer requests with varying request data structures against the quote engine, which in turn calls through to 6 quote providers to fulfill the requests.

It is just included here as another example of running tasks in parallel.

C#
[Test]
[QuoteRequestCleanup(CleanupAction)]
public void BuildProductQuotes99_SendMultipleMessages()
{
    var tasks = new List<messagerunner>
        {
            new MessageRunner("01_Success.xml"),
            new MessageRunner("02_Success_CarModified.xml"),
            new MessageRunner("03_Success_CarDamaged.xml"),
            new MessageRunner("10_RemoteServer_TimeoutA.xml"),
            new MessageRunner("10_RemoteServer_TimeoutB.xml"),
            new MessageRunner("11_RemoteServer_NullMessage.xml"),
            new MessageRunner("12_RemoteServer_InvalidResultStructure.xml"),
            new MessageRunner("13_RemoteServer_UnknownProducts.xml"),
            new MessageRunner("14_RemoteServer_ZeroProducts.xml"),
            new MessageRunner("15_RemoteServer_ProductKnockout.xml"),
            new MessageRunner("20_QuoteProvider_BuildMessageException.xml"),
            new MessageRunner("21_QuoteProvider_SendMessageException.xml"),
            new MessageRunner("22_QuoteProvider_SendMessageTimeoutException.xml"),
            new MessageRunner("23_QuoteProvider_SendMessageAuthorizationException.xml"),
            new MessageRunner("24_QuoteProvider_ProcessMessageException.xml")
        };

    var taskInfos = ThreadUtil.RunAsynchronously(tasks, t => t.Execute(), 20000);

    _.L("Print Info");
    taskInfos.ForEach(t => t.Task.Print());
}

public class MessageRunner
{
    public MessageRunner(string dataFileName)
    {
        DataFileName = dataFileName;

        QuoteRequestController = Ioc.GetInstance<iquoterequestcontroller>();
        SecurityController = Ioc.GetInstance<isecuritycontroller>();
        SampleUser = SecurityController.AuthenticateUser(
                      "TestPartnerName", "TestPassword");
    }

    private IQuoteRequestController QuoteRequestController { get; set; }
    private ISecurityController SecurityController { get; set; }
    private IUser SampleUser { get; set; }

    private string DataFileName { get; set; }

    public CarInsuranceQuoteRequestResultDto Result { get; set; }

    public void Execute()
    {
        var quoteRequest = 
          T.TestData.QuoteRequestMessage.GetQuoteRequest(DataFileName);

        Result = QuoteRequestController.QuoteRequest(SampleUser, quoteRequest);
    }

    public void Print()
    {
        _.SerializeToString(Result);
    }
}

History

  • May 5, 2010 - Original posted.
  • May 6, 2010 - Updates based on comments.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)