Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / desktop / WPF

A Generic Class for Wrapping Asynchronous Begin/End Operations, Using Reactive Extensions for .NET (Rx)

4.65/5 (13 votes)
3 Feb 2010CPOL4 min read 69.1K   18  
This article presents a reusable class and technique for easily doing Asynchronous Programming using the Begin/End Pattern and the new Reactive Extensions for .NET (Rx) library.

Introduction

One of the patterns you will come across as a .NET programmer is the Begin/End pattern for making asynchronous function calls. The purpose of the pattern is to allow long-running operations to execute on a different thread than the calling thread, leaving the calling thread free (non-blocked) to continue execution. This is an important technique for building responsive GUIs, as well as for making remote calls effectively (whether you are calling a WCF service, using .NET remoting, accessing some REST-based Web Service, etc.). In case you haven't seen it before, it looks like this:

C#
IAsyncResult BeginOperation(...some number of parameters as input, 
             AsyncCallback callback, object state);
SomeResult EndOperation(IAsyncResult);

While the BeginInvoke/EndInvoke pattern is powerful, it is awkward and non-intuitive to work with, especially if the EndOperation portion returns a value. If you look at the pattern above, you can see that the Begin portion takes in a callback delegate and an object "state", and returns an IAsyncResult. It is not readily apparent what you are supposed to pass in as the callback or the state, and it is not immediately clear what you are supposed to do with the IAsyncResult you get upon calling the function! Most importantly, it is not apparent how to get the result of the EndOperation.

To make it easier to use the Begin/End pattern, Microsoft recommends an approach called the Event-based Asynchronous Pattern. While this particular pattern is an improvement, we can do one step better using the new Reactive Extensions for .NET (Rx), or Reactive LINQ, library.

Background

Using the code

In order to use the wrapper, simply create a new instance of the generic type AsyncPatternWrapper. You must use at least one generic type parameter - that parameter will represent what will be returned in the EndOperation. If the BeginOperation takes in additional parameters (besides the callback and state parameters), then you must pass in additional generic parameters.

For example, if the BeginOperation takes in a string, and the EndOperation returns an int, then you would declare a new instance of AsyncPatternWrapper<string, int>(BeginOperation, EndOperation).

The constructor for any of the AsyncPatternWrapper classes takes in two parameters - the BeginOperation, and its corresponding EndOperation.

The AsyncPatternWrapper classes implement the interface IObservable<TResult>, where TResult is the result of the EndOperation call. IObservable is part of the new Reactive Extensions for .NET library.

Aside from implementing IObservable, the AsyncPatternWrapper only has one function - Invoke(). All that does is call the BeginOperation. When that completes, another result will come in on the IObservable stream. As long as you have subscribed to the IObservable stream (via the "Subscribe" function), you will automatically process the results of the EndOperation as they complete.

Example 1 (Consuming a REST Web Service)

C#
WebRequest request = HttpWebRequest.Create(
  @"http://services.digg.com/containers?appkey=http%3A%2F%2Fapidoc.digg.com");
var wrapper = new AsyncPatternWrapper<WebResponse>(
  request.BeginGetResponse, request.EndGetResponse);

wrapper.Subscribe(webResponse => Console.WriteLine(webResponse.ContentLength));
wrapper.Invoke();

The cool thing about the new Reactive Extensions library (which used to be called Reactive LINQ), is that you can use the standard LINQ querying operators over the stream of events. So in Example 1 above, instead of printing the ContentLength of every WebResponse that comes in, you can only print out the WebResponses where the ContentLength is greater than 3000, for example:

C#
wrapper
    .Where(webResponse => webResponse.ContentLength > 3000)
    .Subscribe(webResponse => Console.WriteLine(webResponse.ContentLength));

Example 2 (Consuming a WCF Service)

C#
// Some WCF service interface
[ServiceContract]
public interface IService
{
    [OperationContract(AsyncPattern = true)]
    IAsyncResult BeginGetCustomers(AsyncCallback callback, object state);

    List<Customer> EndGetCustomers(IAsyncResult result);
}

// Code snippet - illustrates consumption of the WCF service on the client-side
            
var cf = new ChannelFactory<IService>(new BasicHttpBinding(), 
         new EndpointAddress(@"http://localhost:8085"));
var service = cf.CreateChannel();

var wrapper = new AsyncPatternWrapper<List<Customer>>(
                  service.BeginGetCustomers, service.EndGetCustomers);
wrapper.ObserveOnDispatcher().Subscribe(customers => UpdateUI(customers));

// refreshButton is some button on the UI - every time you
// click it, you will asynchronously refresh the list of customers:
refreshButton.Click += (s,e) => wrapper.Invoke();

Notice that in Example 2 above, you don't have to switch back to the UI thread via the Dispatcher in order to update the UI! As a result, your code ends up being much cleaner and much more readable.

The other takeaway from this is that instead of having to rely on the WCF proxy generator, you can very easily roll your own proxy by using the AsyncPatternWrapper class. This is invaluable for times when the WCF service doesn't expose a .svc file, or in cases where there can't possibly be one, such as WCF REST.

AsyncPatternWrapper

This is the source code for AsyncPatternWrapper for no arguments in the Begin operation, and for one argument in the Begin operation. See the attached zipped file for AsyncPatternWrapper classes all the way up to four arguments in the Begin operation.

C#
using System;
using System.Linq;

namespace System
{
    #region No arguments

    public class AsyncPatternWrapper<TResult> : IObservable<TResult>
    {
        Func<AsyncCallback, object, IAsyncResult> beginOp;
        Func<IAsyncResult, TResult> endOp;
        IObservable<TResult> observable;
        event EventHandler<EventArgs<TResult>> done;

        public AsyncPatternWrapper(
            Func<AsyncCallback, object, IAsyncResult> beginOp,
            Func<IAsyncResult, TResult> endOp)
        {
            this.beginOp = beginOp;
            this.endOp = endOp;

            observable = Observable.FromEvent<EventArgs<TResult>>(
                e => this.done += e, e => this.done -= e)
                .Select(s => s.EventArgs.Item);
        }

        public void Invoke()
        {
            IAsyncResult result = null;
            result = beginOp(
                new AsyncCallback(notUsed =>
                {
                    try
                    {
                        var res = endOp(result);
                        if (done != null)
                            done(null, new EventArgs<TResult> { Item = res });
                    }
                    catch (Exception)
                    {
                        // Don't raise an event if there was an exception
                    }
                }),
                null);
        }

        #region IObservable<TResult> Members

        public IDisposable Subscribe(IObserver<TResult> observer)
        {
            return observable.Subscribe(observer);
        }

        #endregion
    }

    #endregion


    #region One argument

    public class AsyncPatternWrapper<T, TResult> : IObservable<TResult>
    {
        Func<T, AsyncCallback, object, IAsyncResult> beginOp;
        Func<IAsyncResult, TResult> endOp;
        IObservable<TResult> observable;
        event EventHandler<EventArgs<TResult>> done;

        public AsyncPatternWrapper(
            Func<T, AsyncCallback, object, IAsyncResult> beginOp,
            Func<IAsyncResult, TResult> endOp)
        {
            this.beginOp = beginOp;
            this.endOp = endOp;

            observable = Observable.FromEvent<EventArgs<TResult>>(
                e => this.done += e, e => this.done -= e)
                .Select(s => s.EventArgs.Item);
        }

        public void Invoke(T param)
        {
            IAsyncResult result = null;
            result = beginOp(
                param,
                new AsyncCallback(notUsed =>
                {
                    try
                    {
                        var res = endOp(result);
                        if (done != null)
                            done(null, new EventArgs<TResult> { Item = res });
                    }
                    catch (Exception)
                    {
                        // Don't raise an event if there was an exception
                    }
                }),
                null);
        }

        public IDisposable Subscribe(IObserver<TResult> observer)
        {
            return observable.Subscribe(observer);
        }
    }

    #endregion

    internal class EventArgs<T> : EventArgs
    {
        public T Item { get; set; }
    }
}

Note in the code above that exceptions are just discarded - if you find it more useful to re-throw the exception, or alternatively to expose another IObservable stream dedicated solely to exceptions, it's an easy enough change.

I hope you found this article useful, and enjoyed reading it!

History

  • February 2, 2010 - Version 1.0.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)