Introduction
One of the patterns you will come across as a .NET programmer is the Begin/End pattern for making asynchronous function calls. The purpose of the pattern is to allow long-running operations to execute on a different thread than the calling thread, leaving the calling thread free (non-blocked) to continue execution. This is an important technique for building responsive GUIs, as well as for making remote calls effectively (whether you are calling a WCF service, using .NET remoting, accessing some REST-based Web Service, etc.). In case you haven't seen it before, it looks like this:
IAsyncResult BeginOperation(...some number of parameters as input,
AsyncCallback callback, object state);
SomeResult EndOperation(IAsyncResult);
While the BeginInvoke/EndInvoke pattern is powerful, it is awkward and non-intuitive to work with, especially if the EndOperation portion returns a value. If you look at the pattern above, you can see that the Begin portion takes in a callback delegate and an object "state", and returns an IAsyncResult
. It is not readily apparent what you are supposed to pass in as the callback or the state, and it is not immediately clear what you are supposed to do with the IAsyncResult
you get upon calling the function! Most importantly, it is not apparent how to get the result of the EndOperation.
To make it easier to use the Begin/End pattern, Microsoft recommends an approach called the Event-based Asynchronous Pattern. While this particular pattern is an improvement, we can do one step better using the new Reactive Extensions for .NET (Rx), or Reactive LINQ, library.
Background
Using the code
In order to use the wrapper, simply create a new instance of the generic type AsyncPatternWrapper
. You must use at least one generic type parameter - that parameter will represent what will be returned in the EndOperation. If the BeginOperation takes in additional parameters (besides the callback and state parameters), then you must pass in additional generic parameters.
For example, if the BeginOperation takes in a string
, and the EndOperation returns an int
, then you would declare a new instance of AsyncPatternWrapper<string, int>(BeginOperation, EndOperation)
.
The constructor for any of the AsyncPatternWrapper
classes takes in two parameters - the BeginOperation, and its corresponding EndOperation.
The AsyncPatternWrapper
classes implement the interface IObservable<TResult>
, where TResult
is the result of the EndOperation call. IObservable
is part of the new Reactive Extensions for .NET library.
Aside from implementing IObservable
, the AsyncPatternWrapper
only has one function - Invoke()
. All that does is call the BeginOperation. When that completes, another result will come in on the IObservable
stream. As long as you have subscribed to the IObservable
stream (via the "Subscribe" function), you will automatically process the results of the EndOperation as they complete.
Example 1 (Consuming a REST Web Service)
WebRequest request = HttpWebRequest.Create(
@"http://services.digg.com/containers?appkey=http%3A%2F%2Fapidoc.digg.com");
var wrapper = new AsyncPatternWrapper<WebResponse>(
request.BeginGetResponse, request.EndGetResponse);
wrapper.Subscribe(webResponse => Console.WriteLine(webResponse.ContentLength));
wrapper.Invoke();
The cool thing about the new Reactive Extensions library (which used to be called Reactive LINQ), is that you can use the standard LINQ querying operators over the stream of events. So in Example 1 above, instead of printing the ContentLength
of every WebResponse
that comes in, you can only print out the WebResponse
s where the ContentLength
is greater than 3000, for example:
wrapper
.Where(webResponse => webResponse.ContentLength > 3000)
.Subscribe(webResponse => Console.WriteLine(webResponse.ContentLength));
Example 2 (Consuming a WCF Service)
[ServiceContract]
public interface IService
{
[OperationContract(AsyncPattern = true)]
IAsyncResult BeginGetCustomers(AsyncCallback callback, object state);
List<Customer> EndGetCustomers(IAsyncResult result);
}
var cf = new ChannelFactory<IService>(new BasicHttpBinding(),
new EndpointAddress(@"http://localhost:8085"));
var service = cf.CreateChannel();
var wrapper = new AsyncPatternWrapper<List<Customer>>(
service.BeginGetCustomers, service.EndGetCustomers);
wrapper.ObserveOnDispatcher().Subscribe(customers => UpdateUI(customers));
refreshButton.Click += (s,e) => wrapper.Invoke();
Notice that in Example 2 above, you don't have to switch back to the UI thread via the Dispatcher in order to update the UI! As a result, your code ends up being much cleaner and much more readable.
The other takeaway from this is that instead of having to rely on the WCF proxy generator, you can very easily roll your own proxy by using the AsyncPatternWrapper
class. This is invaluable for times when the WCF service doesn't expose a .svc file, or in cases where there can't possibly be one, such as WCF REST.
AsyncPatternWrapper
This is the source code for AsyncPatternWrapper
for no arguments in the Begin operation, and for one argument in the Begin operation. See the attached zipped file for AsyncPatternWrapper
classes all the way up to four arguments in the Begin operation.
using System;
using System.Linq;
namespace System
{
#region No arguments
public class AsyncPatternWrapper<TResult> : IObservable<TResult>
{
Func<AsyncCallback, object, IAsyncResult> beginOp;
Func<IAsyncResult, TResult> endOp;
IObservable<TResult> observable;
event EventHandler<EventArgs<TResult>> done;
public AsyncPatternWrapper(
Func<AsyncCallback, object, IAsyncResult> beginOp,
Func<IAsyncResult, TResult> endOp)
{
this.beginOp = beginOp;
this.endOp = endOp;
observable = Observable.FromEvent<EventArgs<TResult>>(
e => this.done += e, e => this.done -= e)
.Select(s => s.EventArgs.Item);
}
public void Invoke()
{
IAsyncResult result = null;
result = beginOp(
new AsyncCallback(notUsed =>
{
try
{
var res = endOp(result);
if (done != null)
done(null, new EventArgs<TResult> { Item = res });
}
catch (Exception)
{
}
}),
null);
}
#region IObservable<TResult> Members
public IDisposable Subscribe(IObserver<TResult> observer)
{
return observable.Subscribe(observer);
}
#endregion
}
#endregion
#region One argument
public class AsyncPatternWrapper<T, TResult> : IObservable<TResult>
{
Func<T, AsyncCallback, object, IAsyncResult> beginOp;
Func<IAsyncResult, TResult> endOp;
IObservable<TResult> observable;
event EventHandler<EventArgs<TResult>> done;
public AsyncPatternWrapper(
Func<T, AsyncCallback, object, IAsyncResult> beginOp,
Func<IAsyncResult, TResult> endOp)
{
this.beginOp = beginOp;
this.endOp = endOp;
observable = Observable.FromEvent<EventArgs<TResult>>(
e => this.done += e, e => this.done -= e)
.Select(s => s.EventArgs.Item);
}
public void Invoke(T param)
{
IAsyncResult result = null;
result = beginOp(
param,
new AsyncCallback(notUsed =>
{
try
{
var res = endOp(result);
if (done != null)
done(null, new EventArgs<TResult> { Item = res });
}
catch (Exception)
{
}
}),
null);
}
public IDisposable Subscribe(IObserver<TResult> observer)
{
return observable.Subscribe(observer);
}
}
#endregion
internal class EventArgs<T> : EventArgs
{
public T Item { get; set; }
}
}
Note in the code above that exceptions are just discarded - if you find it more useful to re-throw the exception, or alternatively to expose another IObservable
stream dedicated solely to exceptions, it's an easy enough change.
I hope you found this article useful, and enjoyed reading it!
History
- February 2, 2010 - Version 1.0.