Introduction.
This is an alternative to the Observer Pattern implementation detailed in Observer Pattern (C#). This alternative uses an event processing system based on Linq
known as Reactive Extension or Rx for short. It is very well documented here and the Rx Wiki is here.
A brief explanation of the Observer Pattern.
The Observer Pattern defines a one-to-many relationship between objects. It is also known as Publish Subscribe. When the publisher (the ‘one’) publishes data, all the subscribers (the ‘many’) receive the data simultaneously. The pattern helps to maintain consistency between related objects and shows good separation of concerns because the publisher need only know that a subscriber implements a method to receive the data and subscribers need only to know that the publisher implements a method to allow them to receive the data.
Why use Rx?
One of the advantages of using Rx with the observer pattern is that publishers in Rx run asynchronously, so that program execution does not get blocked while the publisher is busy retrieving data. Instead, callbacks are used to notify subscribers when data is available. Callbacks are also used to inform subscribers if there is an error and when the sequence has ended. This avoids the situation where subscribers are continuing to observe events that have either finished or faulted. Rx also facilitates the use of Linq for filtering, grouping and composing data and there are methods available for testing as well as carrying out time-related processing such as buffering and throttling.
Rx implementation basics.
In Rx, the subscribers to a data stream are called Observers and the publisher of a data stream is called an Observable. Observables implement the interface IObservable<T>
, it has one method, named Subscribe
, that, as the name implies, enables Observers to subscribe. The Subscribe
method returns an object that implements IDisposable
. Unsubscribing is simply a matter of calling that object's Dispose()
method.
Observers implement the interface IObserver<T>
. IObserver<T>
has three callback methods that allow the Observable
to send information to its Observers. Here is the interface.
public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}
An Implementation of the Observer pattern using Rx.
Rx has a helper class named Subject
. Subject
implements both IObservable<T>
and IObserver<T>
so it can be both an Observer
and an Observable
. Its OnNext(T value)
method outputs value
to all subscribed Observers. Here's a simple IObserver<int>
class that can be used to subscribe to a Subject
and output the data received to the console.
public class ObserverOfInts : IObserver<int>
{
public ObserverOfInts(string observerName)
{
this.ObserverName = observerName;
}
public string ObserverName { get; private set; }
public void OnCompleted()
{
Console.WriteLine("Sequence completed");
}
public void OnError(Exception error)
{
Console.WriteLine("Exception raised. {0}", error.Message);
}
public void OnNext(int value)
{
Console.WriteLine("{0}: Observed {1}", this.ObserverName, value);
}
}
In this example, the Subject
subscribes to an Observable
and outputs the data stream to its subscribers.
private static void SimpleDemoUsingSubject()
{
IEnumerable<int> intList = new List<int> { 2, 4, 8, 10, 14 };
IObservable<int> observableOfInts = intList.ToObservable();
var subject = new Subject<int>();
var firstObserver = new ObserverOfInts("First");
IDisposable firstSubscription = subject.Subscribe(firstObserver);
IDisposable anonymousSubscription = subject.Subscribe(
v => Console.WriteLine(" Anonymous observed " + v),
() => Console.WriteLine(" Anonymous observed the sequence completed"));
Console.WriteLine("Press Enter to End");
IDisposable subjectSubscription = observableOfInts.Subscribe(subject);
Console.ReadLine();
firstSubscription.Dispose();
anonymousSubscription.Dispose();
subjectSubscription.Dispose();
Console.ReadKey();
}
Observing Events with Rx
Events are not easy to observe. The relevant data can be buried deeply in the event arguments and events are usually firmly attached to the class they are defined in. This makes them cumbersome to pass around for processing. With Rx, Events can be converted to observable objects of a type equal to the relevant data type and can be both passed to methods and tested with ease. The following example uses the MouseMove Event
and the data is processed into an observable of Points
. Some filtering is carried out so that a point is only sent to the observer when 10 is a factor of both point.X and point.Y. The important method is Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove")
where MoveEventArgs
is the event arguments type, this
is the sender and MouseMove
is the Event
name. The reason for calling the ObserveOn
extension method is detailed.later
public void EventDemo()
{
this.OutputTextBox.AppendText("\r\n Event demo");
this.OutputTextBox.AppendText("\r\n Move the mouse around this box to observe when 10 is a factor of both mouse Point.X and Point.Y ");
IObservable<EventPattern<mouseeventargs>> observableOfArgs =
Observable.FromEventPattern<mouseeventargs>(this.OutputTextBox, "MouseMove");
IObservable<point> points = observableOfArgs.Select(args => args.EventArgs.Location);
var multiplesOf10= points.Where(point => point.X % 10 == 0 && point.Y % 10 == 0);
IDisposable subscription =
multiplesOf10.ObserveOn(new ControlScheduler(this))
.Subscribe(point => this.OutputTextBox.AppendText("\r\n"+point.ToString()));
this.disposables.Add(subscription);
}</point>
Testing The Observer Pattern
Rx has numerous methods that are useful for testing purposes. The output can be time stamped, delayed, throttled and sampled. A useful method for constructing an Event proxy is the Observable.Interval(TimeSpan.span)
. This produces a stream of Longs at interval span
. The output is 0,1,2,3....It can be used like this.
public static void PointsExample()
{
var points = new List<Point> { new Point(1, 2), new Point(3, 3), new Point(5, 6), new Point(7, 7) };
IObservable<point> onePointPerSecondsObservable =
Observable.Interval(TimeSpan.FromSeconds(1))
.Take(points.Count)
.Where(i => points[(int)i].X == points[(int)i].Y)
.Select(i => points[(int)i]);
onePointPerSecondsObservable.Subscribe(
point => Console.WriteLine(point.ToString()),
() => Console.WriteLine("Completed"));
Console.ReadKey();
}
public static void PointsExampleA()
{
var points = new List<Point> { new Point(1, 2), new Point(3, 3), new Point(5, 6), new Point(7, 7) };
IObservable<point> onePointPerSecondsObservable =
Observable.Generate(0, i => i < points.Count, i => i + 1,
i => points[i], i => TimeSpan.FromSeconds(1))
.Where(point => point.X == point.Y);
onePointPerSecondsObservable.Subscribe(
point => Console.WriteLine(point.ToString()),
() => Console.WriteLine("Completed"));
Console.ReadKey();
}
The output is.
A Couple of Gotchas to watch out for
Rx has a couple of Elephant traps that are easy to fall into
It's cold when it needs to be hot.
Implementing the observer pattern like this doesn't work.
private static void DemoColdA()
{
var oneNumberPerSecondObservable = Observable.Interval(TimeSpan.FromSeconds(1));
var firstSubscriber = oneNumberPerSecondObservable.Subscribe(x=>Console.WriteLine("First observed "+x));
Thread.Sleep(TimeSpan.FromSeconds(2));
var secondSubscriber = oneNumberPerSecondObservable.Subscribe(x=>Console.WriteLine("Second observed "+x));
Console.WriteLine("Press any key to stop the demo");
Console.ReadKey();
firstSubscriber.Dispose();
secondSubscriber.Dispose();
Console.WriteLine("Press any key to end the demo");
Console.ReadKey();
}
It's clear that both observers are not observing the same values at the same time. The reason for this is that the observable is 'Cold'. Cold observables restart the data stream for each Observer and each gets its own stream. The solution is to convert a 'Cold' Observable to a 'Hot' Observable by using the Publish
extension method.
var oneNumberPerSecondObservableHot = oneNumberPerSecondObservableCold.Publish();
Observing on the wrong thread
Windows Forms, WPF and Silverlight all have thread defined operations. That is, operations that must be carried out on a specific thread. You will get an error if you try to do the following in Windows Forms
var subscription = obsLongs.Subscribe(x => form.Text = "Event generated " + x);
The form will throw a 'cross thread operation not valid' exception as it is being accessed from the Observer's thread. This problem used to apply to WPF and Silverlight projects but it seems to have been resolved in the latest version of Rx. However, there may still be occasions when it is desirable to define the thread that the Observers run on. This can be done by passing a scheduler into the ObserveOn
extension method. Here is a solution for Windows Forms.
var subscription = obsLongs.ObserveOn(new ControlScheduler(form)).Subscribe(x => form.Text = "Event generated " + x);
With WPF and Silverlight you can use the static class, Scheduler
observable.ObserveOn(Scheduler.CurrentThread).Subscribe(x => label1.Content = x.X.ToString());
observable.ObserveOn(Scheduler.Default).Subscribe(x => label1.Content = x.X.ToString());
observable.ObserveOn(Scheduler.Immediate).Subscribe(x => label1.Content = x.X.ToString());
The Observer Pattern with Buffering and Linq
The real power of Rx lies in its ability to filter data in order to control both what the observers see and when they see it. In the following example, the data stream represents the water temperature output from a temperature controller. The observers are notified if two consecutive readings are above 40C. The data stream is split into segments using buffering. Linq is then used to process the segments before the data is passed to the observers.
private static void DemoBuffered()
{
IObservable<long> oneNumberPerSecondObservable = Observable.Interval(TimeSpan.FromSeconds(1));
var random = new Random();
IConnectableObservable<long> randomNumberObservableHot =
oneNumberPerSecondObservable.Select(n => (long)random.Next(1, 100)).Publish();
IDisposable randomNumberSubscription =
randomNumberObservableHot.Subscribe(x => Console.WriteLine("Generated " + x));
var buffered = from firstLast in randomNumberObservableHot.Buffer(2, 1)
where firstLast[0] > 40 && firstLast[1] > 40
select new { First = firstLast[0], Last = firstLast[1] };
var observableWithTimeout = buffered.Timeout(TimeSpan.FromSeconds(8));
IDisposable firstSubscription =
observableWithTimeout.Subscribe(
anon => Console.WriteLine("First observed {0} and {1} ", anon.First, anon.Last),
ex => Console.WriteLine("First observed an Exception. " + ex.Message));
IDisposable secondSubscription =
observableWithTimeout.Subscribe(
anon => Console.WriteLine("Second observed {0} and {1} ", anon.First, anon.Last),
ex => Console.WriteLine("Second observed an Exception. " + ex.Message));
randomNumberObservableHot.Connect();
Console.WriteLine("Press Enter to unsubscribe the first observer");
Console.ReadLine();
firstSubscription.Dispose();
Console.WriteLine("***first subscriber has unsubscribed ***");
Console.WriteLine("Press Enter to end");
Console.ReadLine();
secondSubscription.Dispose();
randomNumberSubscription.Dispose();
Console.ReadKey();
}
Conclusion
This brief article has only just scratched the surface of what can be achieved with Rx. If you wish to dig deeper, there are some excellent videos featuring two enthusiastic Rx developers, Wes Dyer and Bart De Smet.