Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#4.0

Reactive Extensions (Rx)

2.19/5 (13 votes)
2 Nov 2015CPOL7 min read 23.2K   345  
Introduction to Reactive Extensions (Rx)

Introduction

This article gives a brief introduction to Reactive Extensions (Rx). Rx is open source library, the project was started by Microsoft Open Technologies, Inc. with collaboration with open source community. MSDN defines Rx as "A library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators." which I feel is not enticing enough and does not do justice to the powerful tool that it's. I tried to come up with a better definition but failed, so I am leave the challenge to readers of this article as how best to define Rx. The section below is dedicated to describing Rx in detail and example program attached shows simplest way to implement and use Rx.

Reactive Extensions (Rx)

The Rx is very difficult to be explain in plain words and involves multiple concepts, which are put together to make life of a programmer easier. The Reactive Manifesto defines attributes of a Reactive program or application. I have divided each concept into different sections below, so you can weave it in your own way.

Paradigm Shift in Programming

In early days of programming with PCs we had a fast single core CPU, and as a programmer we had to make sure that its used optimally we used create fewer threads or processes in our applications to made sure that it was good mixture of IO and CPU bound threads. In recent years programming world of PCs have undergone a paradigm shift with advent of fast multi-core CPU. This gave birth to different types of Asynchronous programming libraries like TPL, PLINQ and different frameworks including Rx to take advantages of multi-core CPUs.

Image 1

Modelling a problem

Traditionally doing Asynchronous programming was hard you had to create your thread or process, and in case of frequent need to do asynch operations we had to create a pool of thread or process and manage their life cycles. Handling of error conditions and exceptions where cumbersome task.

To make life easier for a programmer different programming languages or frameworks provided ways to simplify asych operations.

The .Net 1.0 provided Asynchronous Programming Model (APM) with delegates that can invoke methods asynchronously using BeginInvoke() it provided call back and wait handles, it addressed all aspects of asynch programming and thread pool was managed by .NET.

The .Net 2.0 provided Event Based Asynchronous Programming Model (EAP).

The .Net 4.0 provided Task Parallel Library (TPL).

The have a major drawback that they do not provide a consistent model to represent asynchronous program this causes code written to by different people look differently making it a mess and difficult to understand code flow. The TPL library has addressed this problem to great extent and provides a better readable code, than handling threads using thread pool especially synchronization of various tasks in your application and error handling in case of failure. The Rx came up with consistent way of modelling the problem in terms of subject and observer which is classic Observer design pattern known to all developers.

Observer design pattern

The Rx makes all worker threads as subject and main thread(s) as observer this provides a consistent way for main thread(s) and worker threads to synchronize the work. 

Image 2

Observable sequence

The observable sequence is what makes Rx unique from other library it allows an observer to receive state\event change notification of the subject in orderly fashion, it also synchronizes the call back so observer need not use any kind of locking mechanism to observe. The Rx provides a scheduler to control how call backs needs to be handled. It provides mainly three options.

NewThread :  This option created new thread each time a Subject notifies the Observer.

ThreadPool :  This option allows scheduler to use pool of threads to call back.

TaskPool :  This option allows scheduler to use Task pool threads to call back.

Image 3

No blocking

The Observer at receiving end of observable sequence need not use any thread synchronization mechanism to protect the data that its processing it, if Subject generates sequence very quickly for Observer to process traditionally most Subject and Observer had to synchronize. The Rx takes care of this for you, this greatly simplifies asynch programming. The non-blocking nature also speeds up execution time. The Rx can easily take advantage of multi-core processor environment as programmers need not manage the parallel execution.

Reactive Extensions (Rx) Architecture

The Rx architecture in .NET revolves around two main interfaces in mscorlib.dll that are mentioned below:

  1. IObservable<T> (Implemented by Subject).
  2. IObserver<T> (Implemented by Observers).

As the name Extension itself indicates most of the Rx features are provided as extensions so you can easily adopt your existing code to use Rx. The diagram below shows the Rx assemblies that need to be imported in your project to use Rx.

Image 4

System.Reactive.Core:  Is the heart of Rx and had extension methods for subscribing to subject. It has static class Observer to create observers. It also provides synchronization and scheduling services.

System.Reactive.Interfaces: This provides the interfaces for for event pattern which subject can implement to raise events when data is available. It also has the scheduler interfaces and queriable interfaces for Rx schedulers and LINQ support.

System.Reactive.Linq: This provides the famous static class Observable to create in-memory observable sequence. This extends the basic LINQ feature to Rx.

System.Reactive.PlatformServices: This extends the basic scheduling services provided in System.Reactive.Core and will replace all the scheduling service in future.

Using the code

To show how Rx can be used I have provided two example programs with the article; both are .Net console application and uses Rx 2.2.5 version.  The BasicReactiveExtension program shows how a library can implement a subject so an application can take advantage of it. The AdvancedReactiveExtension program shows how we can easily adapt an existing method and convert it into a reactive program.

The diagram below shows how BasicReactiveExtension works.

Image 5

The NumberGenerator class acts like a subject that can be observed by Rx by implementing IObservable<int> which requires it to implement IDisposable Subscribe(IObserver<T> observer) it will return IDisposabe object that an observer can use to cancel subscription. The main role of this class  generates sequential numbers starting zero as an observable sequence when GenerateNumbers()  method is called.

C#
public class NumberGenerator : IObservable<int>
{        
    private List<IObserver<int>> observers;

    public IDisposable Subscribe(IObserver<int> observer)
    {
        if (!observers.Contains(observer))
        {
            observers.Add(observer);

        }//End-if (!observers.Contains(observer))

        return new Unsubscriber(observers, observer);
    }

    public void GenerateNumbers()
    {
        for (int i = 0; ; i++)
        {
            Thread.Sleep(250);

            foreach (var observer in observers.ToArray())
            {
                if (i == 10)
                {
                    observer.OnError(new NumberNotGeneratedException());
                }
                else
                {
                    observer.OnNext(i);

                }//End-if-else (i == 10)

            }//End-foreach (var observer in observers.ToArray())

        }//End-for (int i = 0; ; i++)
    }
}

The class NumberObserver is an observer and implements the IObserver<int> interface and has ability to classify number as even and odd.

C#
public class NumberObserver : IObserver<int>
{
    private IDisposable unsubscriber;
    private string instName;

    private bool isEvenObserver;

    public NumberObserver(string name, bool isEvenObserver)
    {
        this.instName = name;

        this.isEvenObserver = isEvenObserver;
    }

    public string Name
    { get { return this.instName; } }


    public virtual void Subscribe(IObservable<int> provider)
    {
        if (provider != null)
        {
            unsubscriber = provider.Subscribe(this);

        }//End-if (provider != null)
    }

    public virtual void OnCompleted()
    {
        Console.WriteLine("The Number Generator has completed generation {0}.", this.Name);
        this.Unsubscribe();
    }

    public virtual void OnError(Exception e)
    {
        Console.WriteLine("{0}: Error occured while generating number.", this.Name);
    }

    public virtual void OnNext(int value)
    {
        bool isEven = value % 2 == 0;

        if (this.isEvenObserver && isEven)
        {
            Console.WriteLine("{1}: The current number is Even. Value => {0}", value, this.Name);

        }
        else if (!this.isEvenObserver && !isEven)
        {
            Console.WriteLine("{1}: The current number is Odd. Value => {0}", value, this.Name);

        }//End-if (this.isEvenObserver && isEven)
    }

    public virtual void Unsubscribe()
    {
        Console.WriteLine("{0} unsubscribed.", this.Name);

        unsubscriber.Dispose();
    }
}

The main program creates respective object and requests NumberGenerate object to generate numbers using a separate thread till user press any key on keyboard to exit.

C#
static void Main(string[] args)
{
    //Define number provider
    NumberGenerator NumberProvider = new NumberGenerator();

    //Have two observers.
    NumberObserver reporter1 = new NumberObserver("EvenObserver", true);
    reporter1.Subscribe(NumberProvider);

    NumberObserver reporter2 = new NumberObserver("OddObserver", false);
    reporter2.Subscribe(NumberProvider);

    Console.WriteLine("Press any key to stop observering.");

    Task.Factory.StartNew(() => NumberProvider.GenerateNumbers());

    Console.ReadKey();

    NumberProvider.StopGeneration();

    Console.WriteLine("Press any key to exit.");

    Console.ReadKey();
}

To show error handling abilities, when number 10 is generated the subject throws an error which observer can catch via OnError() method.  This basic program does not take advantage of Rx ability to generate observable sequence and it will not provide thread safe call backs as user thread drives the notification logic to observers.

The diagram below shows how AdvancedReactiveExtension works.

Image 6

The advanced program takes help of static class System.Reactive.Linq.Observable to create a observable sequence from a non-observable subject in this case NumberGenerator class. The Main program creates an observable object with from Observable static class and observes.

C#
 static void Main(string[] args)
{
    NumberGenerator ng = new NumberGenerator();

    var observable = Observable.ToObservable(ng.GenerateNumbers(), Scheduler.Default);

    Console.WriteLine("Press any key to exit.");

    observable.Subscribe(x => Console.WriteLine("Number generated is => {0}", x));

    Console.ReadKey();

}

The observable sequence created using Rx we can take advantages of all its features mentioned below and also provide non-blocking thread safe callbacks. This frees us from handling complex synchronization logic we need to write to manage our asynchronous model of application.  The NumberGenerator class has single method to GenerateNumbers() to that generates sequence of numbers.

C#
public class NumberGenerator
{
    int sequencer;

    public NumberGenerator()
    {
        sequencer = 0;
    }

    public IEnumerable<int> GenerateNumbers()
    {
        for (int i = 0; ; i++)
        {
            if (i % 2 == 0)
            {
                Thread.Sleep(250);

            }//End-if (i % 2 == 0)

            yield return sequencer++;

        }//End-for (int i = 0; ; i++)

    }
}

Rx Features

The Rx allows you to control observable sequence in different ways allowing applications to take advantage based on target domain requirements.

Skip

This feature allows an Observer to skip a few notifications in an observable sequence.

Concat

This feature allows to concatenate two observable sequence into a single sequence allowing an Observer to consume it as single sequence.

Zip

This feature allows to observer two observable sequences in alternate order.

Timestamp

This allows sampling an observable sequence for specified time interval and notifying observer.

Throttle

This allows observable sequence to be buffered to a specific number before notifying observer.

Rx Language and Framework Support

The Rx is supported by below framework

  • .Net

  • Java

  • JavaScript

  • Ruby

History

  • Added code samples in article explaining the example program.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)