Introduction
If you use RX heavily, you will undoubted come across gaps in the functionality provided. This article will take you through how to extend RX using fluent extensions.
Background
Extending RX is a lot easier that you'd ever imagine, and can be achieved in a few lines of code. All you really need are two elements, an IObservable implementation and an extension method.
The IObservable implemenation
The IObservable interface is very simply contract with only one method Subscribe, which takes a single IObserver argument. Before I continue if you don't know the difference between a hot and cold observable then, I'll quickly explain; A hot observable is an observable that publishes data even if it doesn't have any observers subscribing, where as a cold observable only publishes when an observer is subscribing. The typical behavior for most of the provided observables, is cold and they will only generate a sequence when a observer is subscribing.
Why is that important for us? Well when creating your IObservable implementation, hot and cold observables will required very different implementations.
A Hot implementation
When creating a hot implementation, every observable subscription will share the same generated sequence. This means that the class will need to start generating the sequence either before an observer subscribes or when the first observer subscribes. It also means that when an observer subscribes it will need to be captured and held in a list, until disposed at which point it needs to be removed from the list, see below;
private readonly HashSet<IObserver<T>> _observers = new HashSet<IObserver<T>>();
public void Subscribe(IObserver<T> observer)
{
lock(_observers)
{
if(_observers.Contains(observer)) return Disposabled.Empty();
_observers.Add(observer);
}
return Disposable.Create(() =>
{
lock(_observers)
{
_observers.Remove(observer);
}
}
}
When generating the sequence, each of the observers will need to be notified of each element generated.
public void Generate()
{
int i = 0;
while(true)
{
lock(_observers)
{
foreach(var o in _observers)
o.OnNext(i);
}
Thread.Sleep(50);
}
}
A Cold implementation
While a cold implementation doesn't need to maintain a list of observers, it does have a different requirement. As a sequence is produced only when the observer subscribes and only for that observer, the sequence needs to be generated at the time of the subscription. All functionality and objects required to create the sequence must be declared at that point and should not be shared, see the example below;
public void Subscribe(IObserver<int> observer)
{
var thread = new Thread( () =>
{
int i = 0;
while(true)
{
o.OnNext(i);
Thread.Sleep(50);
}
});
thread.Start();
return Disposable.Create(() => thread.Abort());
}
A Functional approach
An alternative and more functional approach is to use the Observable.Create method, which does not require you to construct a class, and certainly makes more sense for simpler implementations. The method has several different overloads, but probably the easiest, is the one that accepts a function with an IObserver parameter and that returns an IDisposable. The same principals still apply, in that all functionality and objects required to create the sequence must be declared at that point and should not be shared. See below for an example;
Observable.Create<string>( observer =>
{
return a => observer.OnNext(a => a.ToString()),
ex => observer.OnError(ex),
() => observer.OnCompleted());
}
Interacting with an existing sequencing
It is most likely that if you are going to create a Fluent extension that you will want to interact with an existing sequence and either modify its output or add some form of behavior. To do this is actually very simple and all you need is an existing observable. The observable should be passed as an arguement to your class constructor and depending on whether you are implementing a cold or hot observable, subscribed to in your class's subscribe method or when you start to generate your sequence, see the hot and cold examples below;
Hot Example
public ToStringObservable(IObservable<T> observable)
{
_observable = observable;
}
public void Generate()
{
_dispose = _observable.Subscribe(
a =>
{
lock(_observers)
{
foreach(var o in _observers)
o.OnNext(a.ToString());
}
},
ex =>
{
lock(_observers)
{
foreach(var o in _observers)
o.OnError(ex)
}
},
() =>
{
lock(_observers)
{
foreach(var o in _observers)
o.OnCompleted();
}
});
}
Cold Example
public ToStringObservable(IObservable<T> observable)
{
_observable = observable;
}
public IDisposable Subscribe(IObserver<String> observer)
{
return _observable.Subscribe( a =>
{
observer.OnNext(a.ToString());
}, e => observer.OnError(e), () => observer.OnCompleted());
}
}
Creating the Fluent extension
This is probably the easier part of the whole process, you simply need to construct and return you IObservable implementation. For hot or shared this makes less sense.
public static IObservable<string> ToStringObservable<T>(this IObservable<T> observable)
{
return new ToStringObservable<T>(observable);
}
A Cold Functional Fluent extension
Following the earlier functional example, below is an example of how to create a Fluent extension with a functional approach. In short your taking the functionally from the case and moving it to the Observable.Create method.
public static IObservable<string> ToPausable<T>(this IObservable<T> observable,
PauseState pause)
{
return Observable.Create<string>( observer =>
{
var queue = new Queue<T>();
var disposables = new CompositeDisposable();
disposables.Add(_pause.StateChanged
.Where(p => !p)
.Subscribe( _ =>
{
lock (queue)
{
while (queue.Count > 0)
{
observer.OnNext(queue.Dequeue());
}
}
}));
disposables.Add( observable.Subscribe( a =>
{
lock(queue)
{
if (_pause.Paused)
queue.Enqueue(a);
else
observer.OnNext(a);
}
}, e => observer.OnError(e), () => observer.OnCompleted()));
return disposables;
});
}
A simple batched observable
Rx provides the SelectMany Linq extension that turns a single element into an enumeration, but what if you wanted to batch together updates and process them at the same time? (an example being if you wanted to limit the calls to the dispatcher). Well how about this; a batch observable.
Below is an IObservable implementation that observes notifications from a sequence and batches them into batches of a specified size, before sending them to an observer. All functionality and objects required to perform the batching need to be performed and created in the Subscribe method, as this method could be called multiple times and each observer will expect not to share the batched data.
public class BatchObservable<T> : IObservable<IEnumerable<T>>
{
private readonly IObservable<T> _observable;
private readonly int _batchSize;
public BatchObservable(IObservable<T> observable, int batchSize)
{
_observable = observable;
_batchSize = batchSize;
}
public IDisposable Subscribe(IObserver<IEnumerable<T>> observer)
{
var batch = new List<T>();
return _observable.Subscribe( a =>
{
batch.Add(a);
if( batch.Count == _batchSize)
{
observer.OnNext(batch);
batch.Clear();
}
}, e => observer.OnError(e), () => observer.OnCompleted());
}
}
Next we need the extension method. Quite simply this takes the same parameters as the batch class and we will call the method ToBatch.
public static IObservable<IEnumerable<T>> ToBatch<T>(this IObservable<T> observable, int batchSize)
{
return new BatchObservable<T>(observable, batchSize);
}
To use the method we simply call it from an observer.
obseverable.ToBatch(100)
Other exmaples
The zip file contains the following exmaple observables
LatestByKeyObservable Provides the most recent element for each item, based on a key.
PausableObservable An observable that can be paused.
SynchronisedObservable Synchronises to two observables based on a specified comparison.