Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

NParallel, A Small Parallel Execution Library

0.00/5 (No votes)
19 Dec 2007 3  
A Simple Library which allows you to write asynchronous code easily, almost in a synchronous pattern.

Important Updates

NParallel0.2 is released with loop parallelism and multi-task support (For, ForEach, Do), together with a lot of other enhancements(parameterized async call for instance). The codes are almost rewriten and heavily commented for the intensional users. VS2005 version will no longer be supported because later versions will rely highly on functional programming concept with C#3.0 lumbda.

This article mainly remains to introduce the main construct of NParallel, for loop parallelism , I posted a new article discuss the design and usage of loop parallelism for NParallel, you could refer to the testcases attached.

Introduction

There are many ways to write multi-threading codes in .NET, like to use the asynchronous method invocation, create a thread etc. But most of these methods changes the way you write code, and introduces new concepts like IAsyncResult and callback delegates which is somewhat irrelevant with the application logic. They usually make the code hard to read and maintain, and requires application developers to know how the underlying threading works.

My previous project relies heavily on the Begin/EndInvoke model. After defined many delegates and function pairs, I am bored with the Begin/End codes and decide to make a wrapper library to hide the complexity, make the code easy to read and write, and meanwhile provide flexibility to change the underlying threading mechanism later on.

Then I came up with NParallel, which I am going to introduce to you in this article.

The Problem: Time Consuming Tasks

Let's first have a look at the old ways to do parallel code execution. Suppose that we have a query that cost long time to complete:

// Sample code 1: Time Consuming Operation

int LongRunOperation(int queryID)
{
    Thread.Sleep(10000);
    // Do some query

    return queryID * 2;
}

// synchronous call to the method     

int result = LongRunOperation(10);
We defined the method LongRunOperation and call it with given parameters. Everything looks straightforward. But the execution thread will be blocked for 10 seconds! For application which cannot afford this blocking, we have to make run parallel.

The old ways, asynchronous method invocation

There are many ways to make code run parallel, threading and delegate invocation are most commonly used in .NET. In asynchronous method invocation (AMI) mode, the invoke of a method will become a method dispatcher and a callback. It's much easier to write than explicit threading. The following codes shows the function call above in AMI mode:
// asynchronous call to the method     

void BeginLongRunOperation(int queryID)
{
    Func<int, int> operation = LongRunOperation;
    IAsyncResult ar = operation.BeginInvoke(10 , EndBeginLongRunOperation, null);
}

void EndLongRunOperation(IAsyncResult ar)
{
    Func<int, int> operation2 = ((AsyncResult)ar).AsyncDelegate;
    int result = operation2.EndInvoke(ar);
    // process with the return value

}

In the code above I used the pre-defined System.Linq.Func instead of the old fashion user defined delegate . Even through, the code still look tedious and error prone because of:

  • The method call became operation.BeginInvoke(10 , EndBeginLongRunOperation, null);. This separates parameters from the method itself.
  • When writing a callback method, a parameter IAsyncResult ar is introduced, which is not related to the application logic itself.
  • In the callback method, there are a couple of type castings. The correctness of this cannot be assured at compile time. If you use a wrong delegate type, there is no way to find out until a runtime error is thrown.

The NParallel way

When I started to define NParallel, the goal is to:

  • Make the code easy to read and write, make it similar as synchronous calls.
  • Don't break the way a method is called, that is to keep the function and it's parameters together.
  • Make anything async-able, which means allow any code to be invoked asynchronously.

I came up with the idea like this:

// NParallel Psudo code

NResult pResult = NParallel.Execute(parellel_code_block);
if(pResult.IsDone())
{
    var result = pResult.Value;
    // work with the result

}

I was expecting the parallel_code_block to be a method invocation, a delegate, a lumbda expression or even a LINQ query. After trying several ways, I find anonymous delegate an ideal implementation for parallel_code_block. The actual code look like this:

// Current NParallel Gramma, simple asynchronous call with callback:

NResult<int> pResult = NParallel.Execute<int>(()=>
    {return LongRunOperation(10); },     /*Asynchronous code block*/
    EndBeginLongRunOperation             /*Callback operation*/
);
void EndBeginLongRunOperation(int theResult)
{
    // process with the return value of LongRunOperation;

}

Closer look at NParallel

The way NParallel works is by wrap the codes to be executed asynchronously into an anonymous delegate. Benefit from the fact that anonymous delegate can use all the local variables on current stack, I don't need to define different versions of Execute methods based on the parameter. I only defined a generic version and a non-generic version of Execute to deal with methods that have a return value or return nothing. Below are the signatures of the two methods:

// Generic method signature for the invoker.

NResult<T> Execute<T>(Func<T> asyncRoutine, NResultDel<T> callbackRoutine, NExceptionDel exceptionRoutine, CallbackMode callbackMode);
// Method for executing non-result code blocks

NResult Execute <T>(T state, NStateVoidFunc<T> parallelRoutine, NVoidFunc callback, NStateVoidFunc<Exception> exceptionRoutine,
CallbackMode callbackMode)

Both versions of Execute have many overloads, refer to the code for more information, the first parameter defines the code block to be called asynchronously, the second parameter defines a callback and the third parameter defines how exception should be handled and the last defines how the callback methods will be called. All parameters except the first one are optional. We are going to look at each parameter.

First Parameter: The Code Block to Call

let's first have a look at the signature of the two delegates for the first parameter:

// Generic code block delegate

public delegate TResult Func<TResult>();
// Usage in NParallel

delegate() // or just use ()=>
{
    T result;
    //your code block here

    return result;
}
// Void call code block delegate

public delegate void DelegateVoid();
// Usage in NParallel

delegate()
{
    //your code block here

}
You can almost put anything into the delegate code block, if the code block changes shared variables, you are responsible to manage locking. Below are some more complex code blocks you can put into the delegate. Looks cool, eh?
// Current NParallel Gramma, calling to asynchronous code blocks:

NResult<int> pResult = NParallel.Execute<int>(delegate()    
    {
        int op1 = PrepareParam(localVar1);
        int op2 = PrepareParam(localVar2);
        return LongRunOperation(op1 + op2); 
    }     /*Asynchronous code block*/
);
// calling a linq query in NParallel

NParallel.Execute<IList<int>>
(
     delegate()
     {
         return (from i in Enumerable.Range(10, 100)
                        where i % 5 == 0
                        select i).ToList();
     }/*Asynchronous code block in LINQ*/
);
The best thing of NParallel is that it's easy to read, and looks almost the same as synchronous calls, you don't have to break anything, just mark the calls you want to be called parallel.

Important remarks:

When using delegate in the above scenario, it is easy to misuse the closure variables (local variables which will be copied to the delegate execution stack). e.g. assume there is a variable called curCount and you use it in the anonymous delegate, after call to Execute, this variable will be changed, the NParallel engine cannot tell this and might use the changed value instead. If you want to send a variable into the execute thread, you can use the state version instead.

int localVariable = 6;
NParallel.Execute<int ,IList<int>>
(
     localVariable,	// the state variable to send in the execution thread.
     delegate(criteria) // using state variable
     {
         return (from i in Enumerable.Range(10, 100)
                        where i % criteria== 0
                        select i).ToList();
     }
     );

Second Parameter: Provide Your Own Callback

A callback can be used to pass a method to the asynchronous code block, it will be called after the code block itself is finished.You can provide a delegate for callback in NParallel, different from asynchronous delegate invocation, your callback does not need to deal with IAsyncResults, you just need to process the result you are expecting for the method. The signature for the callbacks are defined as followed:
// Generic callback delegate

public delegate void TResultDel<T>(T result);
// void callback delegate

public delegate void DelegateVoid();
// Sample: Provide a callback when invoke the method:

void PrintResult(int value);
NParallel.Execute(delegate(){return temp1 + temp2;},
    PrintResult);

If no callback method is provided, a default callback will be called. And the EndInvoke() is called anyway. You can just fire and forget it.

Third Parameter: The Exception Handling Routine

If exception raises when a asynchronous code block is executed, NParallel will catch the exception and call the exception handling Routine. You can specify the routine by this parameter. If no routine is specified, the exception will be thrown when Value is called.

public delegate void NExceptionDel(Exception exc);
You can provide a global exception handling routine by set NParallel.ExceptionRoutine.

Fourth Parameter: How the Call is Finished

When using the BeginInvoke/EndInvoke APM model, the callback will be automatically called when the method is done. It will be executed on the same thread of the threadpool as the method being executed on. In some circumstances, we may not want it to work this way, we may want the callbacks to be marshaled in a centralized queue or decided when to callback at runtime. The third parameter of NParallel.Execute enables you to do this. CallbackMode is an enum defined as below:

public enum CallbackMode 
{
    Manual,
    Queue,
    Auto
}
  • When CallbackMode is set to Auto, EndInvoke and Callback methods are invoked automatically, just like what you did with Begin/EndInvoke.
  • If you set CallbackMode to Queue, all the asynchronous calls will be marshaled within the queue (see Queue).
  • If you set CallbackMode to Manual, you have to manually call NResult.Wait() to do the EndInvoke and the Callback. (see NResult)

The default value for CallbackMode can be set via NParallel.DefaultMarshal and it's by default Marshal.Auto. You can mix the use of the three callback modes in the application.

The NResult Class 

NResult has two versions, NResult and NResult<T>. Below are the most important methods for them:
// For NResult <T> only
public T Value{get;};

//Block current thread, wait for the method to finish
public void Wait();

// Get the current status of the task.
public bool IsDone();
Wait blocks current thread and calls to EndInvoke and the Callback method provided to get the result of the code block.

Call to Value will result in calling Wait() .

Use the Queue 

NQueue can be used to queue all the tasks invoked with CallbackMode.Queue. It needs to be put into a regular loop of the application, you can use the application message loop or a timer, just call NParallel.Queue.Update();. The callbacks will be invoked in the thread the timer runs.
//Call code block with CallbackMode.Queue

NParallel.Execute(()=>{
    return 0; // your method here

},
CallbackMode.Queue);

//Update NQueue within a timer proc

void TimerProc()
{
    NParallel.Queue.Update();
}
The callback will be called in the same thread as the TimerProc.

Cancel the Task

The library shipped with simple cancel functionality. You can call Cancel on an un-finished NResult, this will stop the callback from being called.
NResult<T> result;
result.Cancel();
Notice that if the task is already executed, Cancel will return false. And even Cancel is called, EndInvoke will still be called for the asynchronous routine.

Behind the Scene

The library current simply wrap the delegate Begin/EndInvoke call with NDefaultStrategy/NDefaultResult. This is just one of the asynchronous models you can adopt, maybe you want to use explicit thread or your own threadpool in stead, you can change the threading policy by replacing the policy layer. You can do this using IParallelStrategy Interface:

    public interface IParallelStrategy
    {
        /// <summary>
        /// Execute a code block asynchronously, with return value
        /// </summary>
        /// <typeparam name="TReturnValue">Result type of the code block</typeparam>
        /// <param name="callerState">CallerState variable</param>
        /// <param name="asyncRoutine">The code block to execute</param>
        /// <param name="callbackRoutine">The callback routine</param>
        /// <param name="callbackMode"></param>
        /// <param name="globalExceptionRoutine">The Exception Routine</param>
        /// <returns>Holder of the result for the code block, canbe used to wait</returns>
        NResult<TReturnValue> Execute<TReturnValue, TState>(TState state, Func<TState, TReturnValue> asyncRoutine, NStateVoidFunc<TReturnValue> callbackRoutine, 
NStateVoidFunc<Exception> exceptionRoutine, CallbackMode callbackMode); /// <summary> /// Execute a code block asynchronously, without return value /// </summary> /// <typeparam name="TReturnValue">type of the callerState variable</typeparam> /// <param name="callerState">CallerState variable</param> /// <param name="asyncRoutine">The code block to execute</param> /// <param name="callbackRoutine">The callback routine</param> /// <param name="callbackMode"></param> /// <param name="globalExceptionRoutine">The Exception Routine</param> /// <returns>Holder of the result for the code block, canbe used to wait</returns> NResult Execute<TState>(TState state, NStateVoidFunc<TState> asyncRoutine, NVoidFunc callbackRoutine, NStateVoidFunc<Exception> exceptionRoutine,
CallbackMode callbackMode); /// <summary> /// The overall exception handling routine, /// will be used when no exception routine is provided for the function /// </summary> NStateVoidFunc<Exception> ExceptionHandler { get; set; } } // NResult abstract class public abstract class NResult { public abstract bool IsDone(); internal abstract object CallerRoutine{get;} public abstract void Wait(/* int milli-sec-to-wait */); public virtual bool Cancel(); } public abstract class NResult<T> : NResult { public abstract T Value { get; } }
You have to implement the IParallelStrategy interface and then call NParallel.Strategy to Replace the default NAMPStrategyImpl, you will also have to implement your own NResult .

I have used the codes in many small test and a application I am working on. It makes asynchronous programming quite easy and fun. I hope the code is useful for you too. You can modify the code and use it in your own project at will, just please keep the credits. If you find defects and bugs in the code, please let me know.

Known Issues

About the Code

The package for VS2005 contains three projects, NParallel, NParallelCore and Testcase, the later two are the very first version of NParallel, which contains the early quick and dirty codes and some incomplete functions. You can have a look at it if you are interested. If you only want to have NParallel, you can ignore the other two projects

VS2005 version NParallel will no longer be supported in further versions(from 0.2)

The package for VS2008 contains only one console project, you can simply change its output type to library if you want a dll.

V0.2 contains three test projects including a GUI image processing library.

History

  • 2007-12-19 V0.2. Loop Parallelism supported.Boudled with other enhancements
  • 2007-11-30 Bug fix, wrong callback invocation.v0.1. Updated source codes. Thanks to radioman.lt@gmail.com
  • 2007-11-30 Add source code for VS2005
  • 2007-11-28 initial version of doc and initial release 0.1.

References

I have seen many articles on asynchronous programming and they are all great. You can just go search in MSDN.

Functional Programming For The Rest of Us

One of the articles I found good for the beginners in codeproject can be found here.

You can download ParallelFX CTP from MSDN now. Which is a parallel extension to .NET3.5.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here