Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / desktop / WPF

A Simple Computation API with Progress Tracking

4.83/5 (20 votes)
25 Apr 2011CPOL8 min read 43.7K   1.5K  
A computation wrapper with built-in progress tracking using Reactive Extensions for .NET (and AsyncCTP).

Table of Contents

  1. Introduction
  2. Background
  3. The API
  4. Source Code, Demo Project, and Binaries
  5. Possible Pitfalls
  6. Future Work
  7. Conclusion

1 Introduction

I always hate when an application does some computation and does not notify the user about its progress, or even worse, does not run the computations asynchronously. So, I have decided that in my own applications, the computations will tell the user what the hell is going on while the UI remains responsive. To achieve this goal, I have implemented the Computation<T> type and when I was finished, the final product kind of "felt right" so I have decided to share it as the almighty Simple Computation Fluent APItm with Progress Tracking (what a fancy name :)). Anyway, the Fluent part idea is actually greatly inspired by this article by Sacha Barber.

The library uses Reactive Extensions for .NET (Rx) and optionally AsyncCTP (SP1), and allows code constructs such as these:

C#
var computation = Computation
  .Create(progress => ComputeSomeStuff(progress))
  .WhenProgressChanged(p => BusyIndicator.Status = p.StatusText)
  .WhenCompleted(result => DisplayResult(result));
computation.RunAsync();

Alternatively, using AsyncCTP:

C#
var result = await Computation
  .Create(progress => DoSomeWork(progress))
  .WhenProgressChanged(p => BusyIndicator.Status = p.StatusText);
DisplayResult(result);

Thanks to Rx, the same code works on both .NET and Silverlight.

2 Background

The basic idea behind the code is to let Rx handle all the asynchronous and scheduling work. Therefore, the asynchronous execution of the code is handled by just one line of code:

C#
new Func<TResult>(RunSynchronously).ToAsync(executedOn)().Subscribe();

Where executedOn is the Scheduler used to run the computation (determines on which thread the code is executed). The AsyncCTP support is separate (because it's still "just a CTP") and is described in this section.

Even though the code uses Reactive Extensions, no knowledge about Rx is required for basic usage of the code (let's face it, Rx isn't exactly easy to grasp). But it is actually thanks to Rx, the code works so well. Although it does not seem like it from the code, as Rx is barely used, Rx's (I)Scheduler does a lot of work. Especially in elegantly preventing invalid cross-thread access issues.

3 The API

3.1 Structure

The basis for the computation type are the interfaces IComputationBase and IComputation<TResult, TProgress>:

C#
interface IComputationBase
{
  ComputationProgress Progress { get; }
  void Cancel();
}

interface IComputation<TResult, TProgress> : IComputationBase
{
  void RunAsync(Action<TResult> onCompleted, Action onCancelled, 
                Action<Exception> onError);
  TResult RunSynchronously();
}

The reason for having the IComputationBase interface is to allow for easy "anonymous" cancelling and progress tracking.

Now, there is an abstract class implementation of the IComputation interface called Computation<TResult> that has the following "visible" fields and methods (omitted some overloads for the sake of clarity):

C#
abstract class Computation<TResult> : IComputation<TResult, ComputationProgress>
{
  ComputationProgress Progress { get; }
  bool IsCancelled { get; }

  // The actual computation happens here.
  protected abstract TResult OnRun();           

  // This is to satisfy the IComputation interface.
  // Internally only calls When* functions and then RunAsync()
  void RunAsync(Action<TResult> onCompleted, 
       Action onCancelled, Action<Exception> onError);
  
  // Runs the computation on the scheduler specified by ExecutedOn
  void RunAsync();                
  
  // Runs the computation on the current thread and immediately returns a result.
  // Returns default(TResult) if something goes wrong.
  TResult RunSynchronously();

  // Cancels the computation if Progress.CanCancel is set to true.
  void Cancel();

  // Sets the scheduler the computation is observed on.
  // The "latest" one is used for all the calls.
  // By default, the observer scheduler is assigned
  // to Scheduler.Dispatcher when the Computation object is created.
  // Therefore, in most cases the observer functions
  // are dispatched to the UI thread.
  Computation<TResult> ObservedOn(IScheduler scheduler);
  
  // Sets the scheduler the computation is executed on.
  // The "latest" one is used for all the calls.
  // ThreadPool scheduler is used by default.
  Computation<TResult> ExecutedOn(IScheduler scheduler);
  
  // Called right before the computation starts executing.
  Computation<TResult> WhenStarted(Action action);  
  // Called if the computation provides a result.
  Computation<TResult> WhenCompleted(Action<TResult> action);
  // Called when progress of the computation is called.
  Computation<TResult> WhenProgressUpdated(Action<ProgressTick> observer);  
  // Called when the computation is cancelled.
  Computation<TResult> WhenCancelled(Action action);
  // Called when an exception occurs duting the computation.
  Computation<TResult> WhenError(Action<Exception> action);
  // Always called when the computation stops
  // regardless of cancellation or error.
  Computation<TResult> WhenFinished(Action action);
}

For convenience, a derived class Computation : Computation<Unit> is added for functions that only have side effects (e.g., Action, the Unit type is implemented as an empty struct and represents void). Internally, there are two classes: RelayComputation : Computation and RelayComputation<TResult> : Computation<TResult> to quickly turn lambdas into asynchronous computations with progress tracking (where the progress tracking object is passed as a parameter):

C#
new RelayComputation<int>(p => 
  { 
    p.UpdateStatus("...");
    //...
    return result; 
  });

To create an instance of RelayComputation, the function Computation.Create(lambda) has to be used. For example:

C#
var computation = Computation.Create(() => 1);

The same can also be achieved by calling the extension method AsComputaion on Func and Action types:

C#
Func<ComputationProgress, int> f = p => DoSomething(p);
var computation = f.AsComputation();

To track and update the progress, the class ComputationProgress is used. It contains the following members (again, some are omitted for clarity):

C#
class ComputationProgress : INotifyPropertyChanged
{
  bool IsComputing { get; }
  bool CanCancel { get; }
  string StatusText { get; }
  bool IsIndeterminate { get; }
  int Current { get; }
  int Length { get; }  

  // This throw a ComputationCancelledException that is caught by the Computation object.
  // Reason for using a custom implementation instead of a CancellationToken is
  // that Silverlight does not currently contain a CancellationToken without using AsyncCTP.
  void ThrowIfCancellationRequested();
  
  void UpdateStatus(string statusText);
  void UpdateCanCancel(bool canCancel);
  void UpdateIsIndeterminate(bool isIndeterminate);
  void UpdateProgress(int current, int length);
  
  void NotifyPropertyChanged(string propertyName)
  {
    PropertyChangedEventHandler handler = PropertyChanged;
    if (handler != null)
    {
      // Make sure the handler is executed on the correct thread.
      // ObserverScheduler is set by the Computation object.
      ObserverScheduler.Schedule(() => handler(this, 
                        new PropertyChangedEventArgs(propertyName)));
    }
  }
}

The reason for implementing INotifyPropertyChanged is to allow seamless data binding. For example:

XML
<StackPanel DataContext="{Binding CurrentComputation.Progress, Mode=OneWay}" >
  <TextBlock Text="{Binding StatusText, Mode=OneWay}" />
  <ProgressBar IsIndeterminate="{Binding IsIndeterminate, Mode=OneWay}"
               Minimum="0"
               Maximum="{Binding Length, Mode=OneWay}"
               Value="{Binding Current, Mode=OneWay}" />
</StackPanel>

Finally, the progress update callbacks are passed an argument of type ProgressTick:

C#
class ProgressTick
{
  string StatusText { get; }
  bool IsIndeterminate { get; }
  bool CanCancel { get; }
  int Current { get; }
  int Length { get; }
}

The reason for having a separate object for progress ticks is not to directly expose the Update* methods to the observer.

3.2 Custom Computations

Custom computations can be created by deriving from the Computation<TResult> (or Compuation) class by overriding the OnRun method. For example:

C#
class UltimateComputation : Computation<int>
{
  int result;
  
  public UltimateAnswer(int result) 
  {
    this.result = result;
  }
  
  protected override int OnRun()
  { 
    Progress.UpdateStatus("Computing the ultimate answer ...");
    Progress.UpdateProgress(1, 100);
    //...
    Progress.UpdateStatus("Pretend doing some work ...");
    //...
    Progress.UpdateStatus("Almost there ...");
    //...
    Progress.UpdateProgess(100);
    
    return result;
  }
}

and then used as:

C#
new UltimateComputation(42)
  .WhenProgressChanged(p => ShowProgress(p))
  .WhenCompleted(r => DisplayResult(r))
  .RunAsync();

One way I am using the code is for repeated transformations of large objects:

C#
class SomeLargeObject
{
  Graph aLargeGraph;
  
  public Computation UpdateAsync(double parameter)
  {
    return Computation.Create(p => 
    { 
      Phase1Modification(aLargeGraph, parameter);
      p.UpdateStatus("I did something interesting");
      Phase2Modification(aLargeGraph, parameter);
    });
  }
}

3.3 Exceptions

If an exception occurs in the underlying computation, it is eaten by the Computation object and the WhenError observers are notified. This behavior is different when AsyncCTP is used, where the exception is re-thrown and can be caught in a try/catch block.

3.4 Cancellation

The method Cancel provides a way of cancelling computations. What happens when Cancel is called is:

  • The cancel flag is set to true so that the next call to ThrowIfCancellationRequested throws the ComputationCancelledException. If the computation does not internally support cancellation via ThrowIfCancellationRequested, it will keep running until it is finished.
  • "Cancel message" is sent to the progress observers.
  • WhenFinished observers are notified.
  • All observers are disposed. As a result, even if the computation does not internally support cancellation, there will be no more notifications.

3.5 AsyncCTP Support

The AsyncCTP support allows to await the computation object. This is achieved through a ComputationAwaiter<TResult> object and the extension method GetAwaiter:

C#
public static ComputationAwaiter<TResult> GetAwaiter<TResult>(
              this Computation<TResult> computation)
{
  return new ComputationAwaiter<TResult>(computation);
}

The ComputationAwaiter<TResult> works by subscribing When* functions in the OnCompleted function:

C#
computation
  // if cancelled, set the cancelled flag to true
  .WhenCancelled(() => this.cancelled = true)
  // if an exception occured, store it
  .WhenError(e => this.exception = e)
  // if the computation yielded a result, store it
  .WhenCompleted(r => this.result = r)
  // after computation has finished (either completed, cancelled or by exception)
  // invoke the continuation prepared by the async builder.
  .WhenFinished(() => continuation())
  // run the computation
  .RunAsync();

The EndAwait only checks what type of result the computation provided and forwards it:

C#
if (this.cancelled) throw new ComputationCancelledException();
if (this.exception != null) throw this.exception;
return this.result;

The correct usage of the code when using AsyncCTP is:

C#
var computation = Computation.Create(...);
try
{
  var result = await computation;
  Display(result);
}
catch (ComputationCancelledException)
{
  // ...
}
catch (Excetion e)
{
  // ...
}

Of course, if you know your computation does not support cancellation/throw exceptions, you can omit the try/catch. Additionally, even though the exceptions are re-thrown when using AsyncCTP, the WhenError subscribers are still invoked.

4 Source Code, Demo Project, and Binaries

There are seven projects in the source code solution:

  • SimpleComputation - Contains the .NET version of the API.
  • SimpleComputation.AsyncCTP - Contains the .NET version of the AsyncCTP extension. To compile this, AsyncCTP needs to be installed on your machine. If you are too lazy to install the CTP, simply exclude the project from the solution.
  • SimpleComputation.Silverlight - Contains the Silverlight version of the API. These are just file links to the SimpleComputation project and different reference assemblies.
  • SimpleComputation.Silverlight.AsyncCTP - Contains the Silverlight version of the AsyncCTP extension. Again, just a file link to the .NET project. To compile this, AsyncCTP needs to be installed on your machine.
  • SimpleComputation.Tests - Some very basic unit testing.
  • SimpleComputation.WpfDemo - WPF demo application.
  • SimpleComputation.SilverlightDemo - Silverlight demo application. Shares the same View Model with the WPF demo.

The usage of the binaries (do not forget to 'unlock' them):

  • .NET 4: Add SimpleComputation.dll and references to System.CoreEx, System.Interactive, and System.Reactive from the Reactive Extensions for .NET library (version 1.0.2856.104 and above, these are included in case you don't have them installed on your computer). To add AsyncCTP support, link SimpleComputation.AsyncCTP.dll and AsyncCTP.dll (to compile the project, AsyncCTP has to be installed in your Visual Studio).
  • Silverlight 4: Add SimpleComputation.Silverlight.dll and references to System.CoreEx, System.Interactive, System.Reactive, and System.Observable from the Reactive Extensions for Silverlight library (version 1.0.2856.104 and above, these are included in case you don't have them installed on your computer). To add AsyncCTP support, link SimpleComputation.AsyncCTP.Silverlight.dll and AsyncCTP_Silverlight.dll (to compile the project, AsyncCTP has to be installed in your Visual Studio).

5 Possible Pitfalls

  • The RunAsync/Synchronously function can only be called once. If called the second time, an InvalidOperationException is thrown.
  • If the computation does not internally support cancellation via ThrowIfCancellationRequested, it will keep running until it is finished when Cancel is called. However, on the outside, it will look as if the computation was terminated.
  • The code is designed to be executed from WPF/Silverlight applications, and thus the default scheduler for computation progress, completed, error, and cancelled notifications is set to Scheduler.DispatcherScheduler. Now, I am not exactly sure how this works in WinForms, therefore the code might not work without calling the ObservedOn function.
  • One needs to be very careful when cancelling operations with side effects.

6 Future Work

There are several features I would like to implement in the future:

  • A ProgressiveComputation type that would provide a basis for computations with intermediate results. For example, imagine generating a plot of a function as the points are computed, or displaying intermediate results of some iterative algorithm and allowing the user to say "stop" when he/she is satisfied with the result. The API would look something like this:
  • C#
    Computation
      .Create(p => IProduceIntermediateResults(p))
      .ResultsNoMoreOftenThan(TimeSpan.FromSeconds(1))
      .OnIntermediateResult(r => Display(r));

    However, this will probably be another project/article, because I want to keep this as lightweight and simple as possible.

  • Allowing per observer scheduler. Currently, the same scheduler is used for all When* events. This is probably a way towards a clean implementation of the next feature.
  • Add "native" support for composing computations with progress tracking. For example, being able to do something like computationA.FollowedBy(computationB), computationB.ExecutedAtTheSameTimeAs(computationB), or computationA.ReplacedByIfFinishedSooner(computationB).
  • Add time elapsed/estimated time remaining support.

7 Conclusion

Although the code is somewhat tested in "production code", as it turns out, there is a lot of subtle things that could go wrong and thus it is still in development. If you use the code and find something wrong with it, please let me know.

References

History

  • April 6th
    • Initial release.
  • April 8th
    • Removed "all in one" source because it was hard to maintain. Added .NET and Silverlight binaries instead.
    • Removed the IDisposable computation that stored the "async subscription". It does not work the way I imagined it would :)
    • Made the class RelayComputation private.
    • Added a class diagram with the visible classes and methods to the project.
    • Added a section about cancellation.
    • Clarified what happens when the computation is called twice (exception is thrown).
  • April 10th
    • Changed the structure of the article considerably.
    • Mostly because of my laziness/ignorance, the AsyncCTP support was kind of "wrong" (it worked but not when exception/cancellation occurred). I have remedied that by writing a custom awaiter for the computation object.
    • ComputationCancelledException is now public.
  • April 25th
    • Updated to AsyncCTP SP1.
    • Demo project no longer dependent on AsyncCTP.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)