Preface
Reactive Extensions have been out there in the wild for some time, and in this post we should discuss Reactive extensions in a bit more detail. Also, in this post we’ll touch IQbservable
s – the most mysteriously named thing/interface in the world, may be after Higgs Boson. Push and Pull sequences are everywhere – and now with devices on one end and the cloud at the other end, most data transactions happen via push/pull sequences. Hence, it is essential to grab the basic concepts regarding the programming models around them.
First Things First
Let us take a step back and discuss IEnumerable
and IQueryable
first, before discussing further about Reactive IObservable
and IQbservable
(Qbservables = Queryable Observables – Oh yea, funny name).
IEnumerable<T>
As you may be aware, the IEnumerable
model
can be viewed as a pull operation. You are getting an enumerator, and then you iterate the collection by moving forward using MoveNext
on a set of items till you reach the final item. And Pull models are useful when the environment is requesting data from an external source. To cover
some basics - IEnumerable
has a GetEnumerator
method which returns an enumerator with a MoveNext()
method and
a Current
property. Offline tip - A C# foreach
statement can iterate on any dumb thing that can return a GetEnumerator
.
Anyway, here is what the non-generic version of IEnumerable
looks like:
public interface IEnumerable
{
IEnumerator GetEnumerator();
}
public interface IEnumerator
{
Object Current {get;}
bool MoveNext();
void Reset();
}
Now, LINQ defines a set of operators as extension methods, on top of the generic version of IEnumerable
– i.e.,
IEnumerable<T>
- So by leveraging the type inference support for Generic Methods, you can invoke these methods on any
IEnumerable
without specifying the type. I.e., you could say someStringArray.Count()
instead of
someStringArray.Count<String>()
. You can explore the Enumerable
class
to find these static extensions.
The actual query operators in this case (like Where
, Count
, etc.) with related expressions are compiled to IL, and they operate in
a process much like any IL code is executed by CLR. From an implementation point of view, the parameter of LINQ clauses like
Where
is a lambda expression (as you may already know, from.. select
is just
syntax sugar that gets expanded to extension methods of IEnumerable<T>
), and in most cases a delegate like
Func<T,..>
can represent an expression from an in-memory perspective. But what if you want
to apply query operators on items sitting somewhere else? For example, how to apply LINQ operators on top of a set of data rows stored in a table in a database that may be in the cloud,
instead of an in-memory collection that is an IEnumerable<T>
? That is exactly what
IQueryable<T>
is for.
IQueryable<T>
IQueryable<T>
is an IEnumerable<T>
(it inherits from
IEnumerable<T>
) and it points to a query expression that can be executed in a remote world. The LINQ operators for querying objects of type
IQueryable<T>
are defined in the Queryable
class,
and returns Expression<Func<T..>>
when you apply them on an IQueryable<T>
,
which is a System.Linq.Expressions.Expression
(you can read about expression trees here). This will be translated to the remote world (say a SQL system) via a query provider. So, essentially,
IQueryable
concrete implementations point to a query expression and a Query Provider – it is the job of
the Query Provider to translate the query expression to the query language of
the remote world where it gets executed. From an implementation point of view,
the parameter you pass to LINQ that is applied on an IQueryable
is assigned to an
Expression<T,..>
instead. Expression trees in .NET provide a way to represent code as data or
a kind of Abstract Syntax Trees. Later, the query provider will walk through this to construct an equivalent query in the remote world.
public interface IQueryable : IEnumerable {
Type ElementType { get; }
Expression Expression { get; }
IQueryProvider Provider { get; }
}
public interface IQueryable<T> : IEnumerable<T>, IQueryable, IEnumerable {
..
}
For example, in LINQ to Entity Framework or LINQ to SQL, the query provider will convert the expressions to SQL and hand it over to the database server. You can even view the translation to the target query language (SQL) just by looking at them
or in short, the LINQ query operators you apply on IQueryable
will be used to build an expression tree, and this will be translated by the query provider to build and execute a query in a remote world. Read this article if you are not clear about how
expression trees are built using
Expression<T>
from Lambdas.
Reactive Extensions
So, now let us get into the anatomy and philosophy of observables.
IObservable <T>
As we discussed, objects of type IEnumerable<T>
are pull sequences. But then, in real world, at times we push things as well – not just pull. (Health Alert – when you do both together, make sure you do it safe). In a lot of scenarios,
the push pattern makes a lot of sense – for example, instead of you waiting in a queue infinitely day and night with your neighbors in front of the local post office to collect snail mails, the post office agent will just push you the mails to your home when they arrive.
Now, one of the cool things about push and pull sequences are, they are duals. This also means,
IObservable<T>
is a dual of IEnumerable<T>
–
see the code below. So, to keep the story short, the dual interface of IEnumerable
derived using the Categorical Duality is
IObservable
. The story goes like some members in Erik’s team (he was with Microsoft then) had a well deserved temporal
megalomaniac hyperactive spike when they discovered this duality. Here is a beautiful paper from Erik on that if you are more interested – A brief summary of Erik’s paper is below.
interface IEnumerable<out T>
{
IEnumerator<T> GetEnumerator();
}
interface IEnumerator<out T>: IDisposable
{
bool MoveNext(); T Current { get; }
}
interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
interface IObserver<in T>
{
void OnCompleted(bool done);
void OnError(Exception exception);
T OnNext { set; }
}
Surprisingly, the IObservable
implementation looks like the Observer pattern.
Now, LINQ operators are cool. They are very expressive, and provide an abstraction to query things. So the crazy guys in the Reactive Team thought they should take LINQ to work against event streams. Event streams are in fact push sequences, instead of pull sequences. So, they built
IObservable
. The IObservable
fabric lets you write LINQ operators on top of push sequences like event streams, much the same way you query
IEnumerable<T>
. The LINQ operators for an object of type
IObservable<T>
are defined in the
Observable
class. So, how will you implement a LINQ operator, like where, on an observer to do some filtering? Here is a simple example of the filter operator
Where
for an IEnumerable
and an IObservable
(simplified for comparison). In the case of
IEnumerable
, you dispose the enumerator when we are done with traversing.
static IEnumerable<T> Where<T>(IEnumerable<T> source, Func<T, bool> predicate)
{
using (var enumerator = source.GetEnumerator())
{
while (enumerator.MoveNext())
{
var value= enumerator.Current;
if (predicate(value))
{
yield return value;
}
}
}
}
static IObservable<T> Where<T>(this IObserver<T> source, Func<T, bool> predicate)
{
return Observable.Create<T>(observer =>
{
return source.Subscribe(Observer.Create<T>(value =>
{
try
{
if (predicate(value)) observer.OnNext(value);
}
catch (Exception e)
{
observer.OnError(e);
}
}));
});
}
Now, look at the IObservable
’s Where
implementation. In this case, we return the
IDisposable
handle to an Observable so that we can dispose it to stop subscription. For filtering, we are simply creating an inner observable that we are subscribing to the source to apply our filtering logic inside that - and then creating another top level observable that subscribes to the inner observable we created. Now, you can have any concrete implementation for
IObservable<T>
that wraps an event source, and then you can query that using
Where
!! Cool. The Observable
class in Reactive Extensions has
a few helper methods to create observables from events, like FromEvent
. Let us create an observable, and query the events now. Fortunately, the Rx Team already has the entire implementation of Observables and related Query operators so that we don’t end up writing customer query operators like this.
You can do a nuget for install-package Rx-Main to install Rx, and try out this example that shows event filtering.
var timer = new Timer() { Interval = 1000 };
timer.Start();
var eventStream = Observable.FromEventPattern<ElapsedEventArgs>(timer, "Elapsed");
var nowTime = DateTime.Now;
var filteredEvents = from e in eventStream
let time = e.EventArgs.SignalTime
where
time > nowTime.AddSeconds(5) &&
time < nowTime.AddSeconds(20)
select e;
filteredEvents.Subscribe(t => Console.WriteLine(DateTime.Now));
Console.WriteLine("Let us wait..");
Console.ReadKey();
Obviously, in the above example, we could’ve used Observable.Timer
– but I just wanted to show how to wrap an external event source with observables. Similarly, you can wrap your Mouse Events or WPF events. You can explore more about Rx and observables, and
a few applications here. Let us move on now to IQbservable
s.
IQbservable<T>
Now, let us focus on IQbservable<T>
. IQbservable<T>
is the counterpart to
IObserver<T>
to represent a query on push sequences/event sources as an expression, much like
IQueryable<T>
is the counterpart of IEnumerable<T>
. So, what exactly
does this mean? If you inspect IQbservable
, you can see that:
public interface IQbservable<out T> : IQbservable, IObservable<T>
{
}
public interface IQbservable
{
Type ElementType { get; }
Expression Expression { get; }
IQbservableProvider Provider { get; }
}
You can see that it has an Expression
property to represent the LINQ to Observable query much like how
IQueryable
had an Expression to represent the AST of a LINQ query.
IQbservableProvider
is responsible for translating the expression to the language of a remote event source (may be a stream server in the cloud).
Interactive Extensions
Interactive Extensions, at its core, has a number of new extensions methods for
IEnumerable<T>
– i.e., it adds a number of utility LINQ to Object query operators. You may have hand coded some of these utility extension methods somewhere in your helpers or utility classes, but now a lot of them are aggregated together by the Rx team. Also, this post assumes you are familiar with the cold
IEnumerable
model and iterators in C#. Basically, what the C# compiler does is, it takes a
yield return
statement and generates a class out of that for each iterator. So, in one way, each C# iterator internally holds a state machine. You can examine this using Reflector or something, on a method
yield return
ing an IEnumerator<T>
. Or better, there is a cool post from my friend Abhishek Sur here or this post
about implementation of Iterators in C#.
More About Interactive Extensions
Fire up a C# console application, and install the Interactive Extensions Package using install-package Ix-Main. You can explore the
System.Linq.EnumerationsEx
namespace in System.Interactive.dll - Now, let us explore some useful extension methods that got added to
IEnumerable
.
Examining Few Utility Methods In Interactive Extensions
Let us quickly examine
a few useful Utility methods.
Do
What the simplest version of
Do
does is pretty interesting. It'll lazily invoke an action on each element in the sequence, when we do the enumeration leveraging the iterator.
var numbers = new int[] { 30, 40, 20, 40 };
var result=numbers.Do(n=>Console.WriteLine(n));
Console.WriteLine("Before Enumeration");
foreach(var item in result)
{
}
Console.WriteLine("After Enumeration");
And the result below. Note that the action (in this case, our Console.WriteLine
to print the values) is applied when we enumerate.
Now, the implementation of the simplest version of the Do
method is something like this: if you have a quick peek at the the Interactive Extensions source code here
in CodePlex, you could see how our Do
method is actually implemented. Here is a shortened version:
public static class StolenLinqExtensions
{
public static IEnumerable<TSource> StolenDo<TSource>(
this IEnumerable<TSource> source, Action<TSource> onNext)
{
using (var e = source.GetEnumerator())
{
while (true)
{
if (!e.MoveNext())
break;
var current = e.Current;
onNext(current);
yield return current;
}
}
}
}
Cool, huh?
DoWhile
DoWhile
in Ix is pretty interesting. It generates an enumerable sequence, by repeating the source sequence till the given condition is true.
IEnumerable<TResult> DoWhile<TResult>(IEnumerable<TResult> source, Func<bool> condition)
Consider the following code:
var numbers = new int[] { 30, 40, 20, 40 };
var then = DateTime.Now.Add(new TimeSpan(0, 0, 10));
var results = numbers.DoWhile(() => DateTime.Now < then);
foreach (var r in results)
Console.WriteLine(r);
As expected, you’ll see the foreach
loop enumerating results repeatedly, till we meet the
DateTime.Now
< then condition – i.e., till we reach 10 seconds.
Scan
Scan
will take a sequence, to apply an accumulator function to generate a sequence of accumulated values. For an example, let us create a simple sum accumulator that'll take a set of numbers to accumulate the sum of each number with the previous one:
var numbers = new int[] { 10, 20, 30, 40 };
var results = numbers.Scan(0,(sum, num) => sum+num);
And you may have a look at the actual Scan
implementation, from the Rx repository in CodePlex.
Here is an abbreviated version:
IEnumerable<TAccumulate> StolenScan<TSource, TAccumulate>
(this IEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate,
TSource, TAccumulate> accumulator)
{
var acc = seed;
foreach (var item in source)
{
acc = accumulator(acc, item);
yield return acc;
}
}
Conclusion
We just touched the tip of the iceberg, as the objective of this post was to introduce you to Ix
and Rx. There is a pretty exciting talk from Bart De Smet here that you should not miss. Ix is specifically very interesting because of its functional roots. Have a look at the Reactive Extensions repository in CodePlex for more inspiration, that should give you a lot more ideas about
a few functional patterns. You may also play with Ix Providers and Ix Async packages.
And let me take the liberty of embedding the drawing created by Charles that is a concrete representation of the abstract drawing Bart did in the white board. This is the summary of this post.
We’ll discuss more practical scenarios where Rx and Ix come so handy in future – mainly for device to cloud interaction scenarios, complex event processing,
task distribution using ISheduler
, etc. - along with some brilliant add-on libraries others are creating on top of Rx. But this one was for a quick introduction. Happy coding!!