Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

SignalR + RX (Streaming Data Demo App 1 of 2)

0.00/5 (No votes)
16 Dec 2014 10  
Small demo app showing how to create a streaming Publisher / Resilient clients using SignalR + RX

Where Is The Code?

You can grab the code from my github repository right here
https://github.com/sachabarber/SignalrRxDemo

Introduction

I work in finance, and at the moment work in the Foreign Exchange (FX) space, where we have lots of requirements that involve streaming of various things, such as:

  • Rates
  • Trades
  • Other financial instruments

Historically what we would have done to accomodate these requirements was to use sockets and push our own data (usually JSON or XML) down the wire, and then had some quite knarly listeners above that, where each listener would want some slightly different view of the original streaming data.

Thing is, time has never stands still in the world of software development, and new things come out all the time, literally every other day a new framework comes along that helps improve what came before it in some way or another.

One particular thing, that I think a lot of people actually misunderstood was Reactive Extensions RX, which in fairness has been around a while now. I think a lot of people think of RX as LINQ to events, which it does have. However the truth is that RX is a superb framework for creating streaming applications and the more you get into RX the more you tend to see everything as a stream, from and event, down to picking a file or a ICommand.Execute in a ViewModel say.

RX comes with many many tools in it's arsenal, such as

  • 2 core interfaces IObservable / IObserver
  • Lots of LINQ like extension methods for IObservable
  • Time / window based operations (LINQ doesn't really have these)
  • Concurrency schedulers
  • Stream error handling controls (Catch etc etc)
  • Ability to create IObservable from a great many other things, such as
    • IEnumerable
    • Task
    • Events
    • IObservable Factories

I personally feel that a lot of people could make use of RX to great gain, if they were to put in the time to learn about it, a bit more.

There is also another great library from Microsoft which facilitates push based technology, namely SignalR. Quite a lot has been written about SignalR, but you rarely see that from a desktop point of view.

Most people would probably associate a SignalR Hub with a web site, but there are various ways you can use SignalR, which also make it a good choice for desktop (ie non web) development. This is possible thanks to the SignalR OWIN hosting API, such that you can self host a SignalR Hub. There is also a .NET client, we can use to communicate with the hub. SignalR also has quite a lot of events within the .NET client, and guess what we can do with those events, yep that's right we can create IObservable(s) streams out of them, which opens up the gates to use RX.

This article will show one area where RX excels, which is in the area  of streaming APIs, where we will be using RX/SignalR

Article Series

  1. Streaming API using SignalR and RX (this article)
  2. Streaming API using NetMQ and RX

Video Showing You What We Will Learn About

I think the best way to see what this demo is about is to have a look at the video, which I have uploaded to YouTube here:

https://www.youtube.com/watch?v=TDIpPvbw6ek

Small RX Primer

There are many many RX Extension methods which you can make use of:

http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable_methods(v=vs.103).aspx

But perhaps the best place to start with RX, is to understand the underlying 2 interfaces you will use time and time again. These are IObservable and IObserver. We discuss these 2 next.

IObservable

namespace System
{
    // Summary:
    //     Defines a provider for push-based notification.
    //
    // Type parameters:
    //   T:
    //     The object that provides notification information.This type parameter is
    //     covariant. That is, you can use either the type you specified or any type
    //     that is more derived. For more information about covariance and contravariance,
    //     see Covariance and Contravariance in Generics.
    public interface IObservable<out T>
    {
        // Summary:
        //     Notifies the provider that an observer is to receive notifications.
        //
        // Parameters:
        //   observer:
        //     The object that is to receive notifications.
        //
        // Returns:
        //     A reference to an interface that allows observers to stop receiving notifications
        //     before the provider has finished sending them.
        IDisposable Subscribe(IObserver<T> observer);
    }
}

Defines a provider for push-based notification.

This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.

  • Subscribe Notifies the provider that an observer is to receive notifications.

Further reading: http://msdn.microsoft.com/en-us/library/dd990377(v=vs.110).aspx

IObserver

namespace System
{
    // Summary:
    //     Provides a mechanism for receiving push-based notifications.
    //
    // Type parameters:
    //   T:
    //     The object that provides notification information.This type parameter is
    //     contravariant. That is, you can use either the type you specified or any
    //     type that is less derived. For more information about covariance and contravariance,
    //     see Covariance and Contravariance in Generics.
    public interface IObserver<in T>
    {
        // Summary:
        //     Notifies the observer that the provider has finished sending push-based notifications.
        void OnCompleted();
        //
        // Summary:
        //     Notifies the observer that the provider has experienced an error condition.
        //
        // Parameters:
        //   error:
        //     An object that provides additional information about the error.
        void OnError(Exception error);
        //
        // Summary:
        //     Provides the observer with new data.
        //
        // Parameters:
        //   value:
        //     The current notification information.
        void OnNext(T value);
    }
}

The IObserver<T> and IObservable<T> interfaces provide a generalized mechanism for push-based notification, also known as the observer design pattern. The IObservable<T> interface represents the class that sends notifications (the provider); the IObserver<T> interface represents the class that receives them (the observer). T represents the class that provides the notification information.

An IObserver<T> implementation arranges to receive notifications from a provider (an IObservable<T> implementation) by passing an instance of itself to the provider's IObservable<T>.Subscribe method. This method returns an IDisposable object that can be used to unsubscribe the observer before the provider finishes sending notifications.
The IObserver<T> interface defines the following three methods that the observer must implement:

  • The OnNext method, which is typically called by the provider to supply the observer with new data or state information.
  • The OnError method, which is typically called by the provider to indicate that data is unavailable, inaccessible, or corrupted, or that the provider has experienced some other error condition.
  • The OnCompleted method, which is typically called by the provider to indicate that it has finished sending notifications to observers.

Further reading : http://msdn.microsoft.com/en-us/library/dd783449(v=vs.110).aspx

Observable.Create

Anyone that has used RX a bit, will come to the point that they may like to create their own operators. In LINQ this would be done by creating a new method (perhaps an extension method) that simple returns a new IEnumerable<T>. In RX things are slightly trickier but not much trickier. In RX you would make use of Observable.Create to create your own operators, or as a factory for creating new a Observable<T>. Here is a simple example:

private IObservable<string> CreateObservable()
{
  return Observable.Create<string>(
    (IObserver<string> observer) =>
    {
      observer.OnNext("a");
      observer.OnNext("b");
      observer.OnCompleted();
      return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
    });
}

Observable.Create creates an observable sequence from a subscribe method implementation. You would typically use an extension method to an existing IObservable<T> and then put an Observable.Create(..) as the body of the extension method.

Observable.Create has this method signature

public static IObservable<TSource> Create<TSource>(
	Func<IObserver<TSource>, IDisposable> subscribe
)

The interesting part of this method signature is that you can see that it returns a Func delegate hat takes a IObserver<TSource> which allows you to push values into the observer using its OnNext/OnError and OnCompleted methods. The Func delegate returns a IDisposable, which will called when disposal happens. Now RX comes with a great many different IDisposable types, don't be suprised if you see some quite exotic IDisposable types being used inside of an Observable.Create implementation.

One of the best descriptions of using Observable.Create is from Lee Campbells site :

http://www.introtorx.com/content/v1.0.10621.0/04_CreatingObservableSequences.html

Standing On The Shoulders Of Giants

I had the pleasure of going on a 2 day Rx course with Lee Campbell the author of like the only Rx book on the planet, which Lee has a blog dedicated to, right here : http://www.introtorx.com/

Lee now works for a consultancy called "Adaptive" who were the ones that actually ran the training course I attended. They have also released into the wild a truly great demo app which showcases how to create a full connected fault tolerant streaming API + client app. They have done several flavours of the demo:

  • WPF
  • Windows Store (WinRT)
  • iOS using Monotouch
  • HTML5 (using TypeScript)

You can grab the demo app here https://github.com/AdaptiveConsulting/ReactiveTrader

Thing is this solution is reasonably large, and if you are new to Rx or are just trying to get your head around how to create a reliable streaming API, this solution is a tad over bearing, at least in my opinion.

To be fair I think the demo app the Adaptive crew have put together serves as more of a marketing tool for the collective minds of Adaptive, people would look at it and go, mmm these guys seem to know their onions let's get them in. However if you are a mere developer trying to get your head into that space, you may be looking for something a little smaller to start with, you know kind of baby steps.

I am hoping that is where this demo article will fit the bill, as I have been through the pain of trying to understand the WPF version of the https://github.com/AdaptiveConsulting/ReactiveTrader demo app, and will present a very much cut down version in this article for you.

Cut Down Demo App

This section outlines a bare bones demo app, which as I say is largely based on the great work done by the guys at Adaptive, and their https://github.com/AdaptiveConsulting/ReactiveTrader demo app. What I wanted to achieve was quite simple:

  1. A single SignalR hub (the Adative example has a few) that would push out TickerDto objects
  2. Clients that would subscribe to a certain topic to receive push notifications from the SignalR server side hub, and push this out via RX, such that anyone interested may use the standard RX operators
  3. The ability to simulate the server dying, and coming back to life
  4. The ability for the connected clients to know when the server was up/down
  5. The ability for the connection between the client and server to be resilient, in that when the server comes back up the client should be able to recover from that

The Adaptive demo does all this (and more), but I am not Lee Campbell and do not have his depth of experience with RX, though I have to say I think I am getting it now. So that is why I wrote up this small demo app

General Idea

This diagram illustrates the general idea:

The Publisher

The publisher is made up of a few bits and pieces, which are described below, one thing to note is that both the client and the server make use of SignalR, so it may be an idea to read about the SignalR self hosting, here:

http://www.asp.net/signalr/overview/guide-to-the-api/hubs-api-guide-server

IOC

There is some IOC used, AutoFac is the container used. As IOC is not the main point of the article, I will not waste any more time on it. If you want to know more about the IOC/AutoFac read the code, looking into the BootStrapper class

MainWindow

There is not much to say about this window, except that is has the following controls

  • Start SignalR : Which starts the SignalR Hub (useful for simulating crash/restart of hub)
  • Stop SignalR : Which stops the SignalR Hub (useful for simulating crash/restart of hub)
  • Start Auto Ticker Data : Will push out random TickerDto objects out of the Hub context at fixed intervals
  • Stop Auto Ticker Data : Will suspend the sending of TickerDto objects out of the Hub
  • Send One Ticker : Will push 1 SINGLE random TickerDto object out of the Hub context

MainWindowViewModel

The MainWindowViewModel is really used to facilitate sending commands to either stop/create the SignalR Hub, or to tell the TickerHubPublisher to do things. Here is the full code for the MainWindowViewModel

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Common.ViewModels;
using log4net;
using Microsoft.Owin.BuilderProperties;
using Microsoft.Owin.Hosting;
using SignalRSelfHost.Hubs.Ticker;

namespace SignalRSelfHost
{
    public class MainWindowViewModel : IMainWindowViewModel
    {
        private const string Address = "http://localhost:5263";
        
        private readonly ITickerHubPublisher tickerHubPublisher;
        private static readonly ILog Log = LogManager.GetLogger(typeof(MainWindowViewModel));
        private IDisposable signalr;

        public MainWindowViewModel(ITickerHubPublisher tickerHubPublisher)
        {
            this.tickerHubPublisher = tickerHubPublisher;

            AutoTickerStartCommand = new DelegateCommand(tickerHubPublisher.Start);
            AutoTickerStopCommand = new DelegateCommand(tickerHubPublisher.Stop);
            SendOneTickerCommand = new DelegateCommand(async () =>
            {
                await tickerHubPublisher.SendOneManualFakeTicker();
            });
            StartCommand = new DelegateCommand(StartServer);
            StopCommand = new DelegateCommand(StopServer);
        }

        public ICommand AutoTickerStartCommand { get; set; }
        public ICommand AutoTickerStopCommand { get; set; }
        public ICommand SendOneTickerCommand { get; set; }
        public ICommand StartCommand { get; private set; }
        public ICommand StopCommand { get; private set; }

        public void Start()
        {
            StartServer();
        }
       
        private void StartServer()
        {
            try
            {
                signalr = WebApp.Start(Address);
            }
            catch (Exception exception)
            {
                Log.Error("An error occurred while starting SignalR", exception);
            }
        }


        private void StopServer()
        {
            if (signalr != null)
            {
                signalr.Dispose();
                signalr = null;
            }

        }

    }
}

As you can see not much really goes on in the MainWindowViewModel, all the action happens elsewhere.

Starting The Publisher

In order to start the publisher, the very first consideration we need to deal with is how to host the SignalR hub. This is handled by an application scope OwinStartupAttribute, which you will find in the demo app within the App class

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using Autofac;
using Microsoft.Owin;
using SignalRSelfHost.Hubs;
using SignalRSelfHost.Hubs.Ticker;
using SignalRSelfHost.IOC;
using log4net;


[assembly: OwinStartup(typeof(Startup))]
namespace SignalRSelfHost
{
    public partial class App : Application
    {
		....
		....
		....
		....
    }
}

The OwinStartupAttribute points to a class (Startup in this case) that configures the Owin SignalR self hosting. Lets have a look at that shall we.

using Microsoft.AspNet.SignalR;
using Microsoft.Owin.Cors;
using Owin;
using SignalRSelfHost.IOC;

namespace SignalRSelfHost.Hubs
{
    public class Startup
    {
        public void Configuration(IAppBuilder app)
        {
            // Branch the pipeline here for requests that start with "/signalr"
            app.Map("/signalr", map =>
            {
                // Setup the CORS middleware to run before SignalR.
                // By default this will allow all origins. You can 
                // configure the set of origins and/or http verbs by
                // providing a cors options with a different policy.
                map.UseCors(CorsOptions.AllowAll);
                var hubConfiguration = new HubConfiguration
                {
                    Resolver = new AutofacSignalRDependencyResolver(App.Container),

                };
                // Run the SignalR pipeline. We're not using MapSignalR
                // since this branch already runs under the "/signalr"
                // path.
                map.RunSignalR(hubConfiguration);
            });
        }
    }
}

So that is how the hub gets started. But how does the hub know about clients, and know how to push out to them? We will look at that next

The TickerHub

The TickerHub is the only hub in the application, and it has 2 main methods. One to allow subscriptions from clients, and one to remove a clients subscription. Here is the code for the hub, Don't worry we will be looking into these 2 methods in a bit more detail in a minute

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Common;
using log4net;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Hubs;

namespace SignalRSelfHost.Hubs.Ticker
{
    [HubName(ServiceConstants.Server.TickerHub)]
    public class TickerHub : Hub
    {
        private readonly ITickerRepository tickerRepository;
        private readonly IContextHolder contextHolder;

        public const string TickerGroupName = "AllTickers";
        private static readonly ILog Log = LogManager.GetLogger(typeof(TickerHub));

        public TickerHub(ITickerRepository tickerRepository, IContextHolder contextHolder)
        {
            this.tickerRepository = tickerRepository;
            this.contextHolder = contextHolder;
        }

        [HubMethodName(ServiceConstants.Server.SubscribeTickers)]
        public async Task SubscribeTrades()
        {

            contextHolder.TickerHubClients = Clients;

            var user = ContextUtil.GetUserName(Context);
            Log.InfoFormat("Received trade subscription from user {0}", user);

            // add client to the trade notification group
            await Groups.Add(Context.ConnectionId, TickerGroupName);
            Log.InfoFormat("Connection {0} of user {1} added to group '{2}'", Context.ConnectionId, user, TickerGroupName);

            var tickers = tickerRepository.GetAllTickers();
            await Clients.Caller.SendTickers(tickers);
            Log.InfoFormat("Snapshot published to {0}", Context.ConnectionId);
        }

        [HubMethodName(ServiceConstants.Server.UnsubscribeTickers)]
        public async Task UnsubscribeTrades()
        {
            Log.InfoFormat("Received unsubscription request for trades from connection {0}", Context.ConnectionId);

            // remove client from the blotter group
            await Groups.Remove(Context.ConnectionId, TickerGroupName);
            Log.InfoFormat("Connection {0} removed from group '{1}'", Context.ConnectionId, TickerGroupName);
        }
    }
}

Subscribe Trades

This TickerHub method is called via the SignalR clients, and will be used to register an interest in a topic, which SignalR calls "groups". Essentially what happens is, when a client launches, it uses it own SignalR proxy to communicate with the server side self hosted hub, and will call the server side hubs SubscribeTrades() method. When that happens the clients connection is added to the server side hubs group, and the client is sent a one of push of all currently held tickers. This single push of all the current tickers, is done by pushing the entire contents of ALL the TickerDto objects currently stored in the TickerRepository, which is a simple in memory queued repository of the most recent ticker TickerDtos produced.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Common;

namespace SignalRSelfHost.Hubs.Ticker
{
   public class TickerRepository : ITickerRepository
    {
       private readonly Queue<TickerDto> tickers = new Queue<TickerDto>();
       private object syncLock = new object();
       private const int MaxTrades = 50;


       public TickerRepository()
       {
           tickers.Enqueue(new TickerDto() {Name="Yahoo", Price=1.2m});
           tickers.Enqueue(new TickerDto() {Name="Google", Price=1022m});
           tickers.Enqueue(new TickerDto() {Name="Apple", Price=523m});
           tickers.Enqueue(new TickerDto() {Name="Facebook", Price=49m});
           tickers.Enqueue(new TickerDto() {Name="Microsoft", Price=37m});
           tickers.Enqueue(new TickerDto() {Name="Twitter", Price=120m});
       }



       public TickerDto GetNextTicker()
       {
           return tickers.Dequeue();
       }


        public void StoreTicker(TickerDto tickerInfo)
        {
            lock (syncLock)
            {
                tickers.Enqueue(tickerInfo);

                if (tickers.Count > MaxTrades)
                {
                    tickers.Dequeue();
                }
            }
        }

        public IList<TickerDto> GetAllTickers()
        {
            IList<TickerDto> newTickers;

            lock (syncLock)
            {
                newTickers = tickers.ToList();
            }

            return newTickers;
        } 
    }
}

Any time a subsequent ticker is created (we will see how this happens soon) the client will get that pushed at them in real time. This bulk push is a one of, when the client 1st establishes comms with the server. The other thing that happens right at the start of this method is that the context is captured such that external classes from the TickerHub, will have access to the TickerHub context and its associated values

contextHolder.TickerHubClients = Clients;

Unsubscribe Trades

This TickerHub UnsubscribeTrades() method is called via the SignalR clients, and will be used to un-register the clients interest in a topic (which SignalR calls "groups". Essentially what happens is, when a client shuts down it uses it own SignalR proxy to communicate with the server side self hosted hub, and will call the server side hubs UnsubscribeTrades() method. When that happens the clients connection is removed from the server side hubs group.

Who Does The Streaming To The Clients?

Ok so we have now seen how SignalR clients register/unregister an interest in a topic (at least from a server side perspective), but how does the client get push notifications from the TickerHub? Who does that? Well that job falls to another component. Namely the TickerHubPublisher. The job of the TickerHubPublisher in this demo app is quite simple, produce a random TickerDto and push it to any connected clients, who are interested in a particular topic. The entire contents of the TickerHubPublisher is shown below. I think the code is pretty self explanatory.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using Common;
using log4net;
using Microsoft.AspNet.SignalR;

namespace SignalRSelfHost.Hubs.Ticker
{
    public class TickerHubPublisher : ITickerHubPublisher
    {
        private readonly IContextHolder contextHolder;
        private readonly ITickerRepository tickerRepository;
        Random rand = new Random();
        private static readonly ILog Log = LogManager.GetLogger(typeof(TickerHubPublisher));
        private CancellationTokenSource autoRunningCancellationToken;
        private Task autoRunningTask;


        public TickerHubPublisher(IContextHolder contextHolder,
            ITickerRepository tickerRepository)
        {
            this.contextHolder = contextHolder;
            this.tickerRepository = tickerRepository;
        }


        public void Stop()
        {
            if (autoRunningCancellationToken != null)
            {
                autoRunningCancellationToken.Cancel();

                // Publisher is not thread safe, so while the auto ticker is running only the autoticker is 
                // allowed to access the publisher. Therefore before we can stop the publisher we have to 
                // wait for the autoticker task to complete
                autoRunningTask.Wait();
                autoRunningCancellationToken = null;
            }
        }

        public async void Start()
        {
            autoRunningCancellationToken = new CancellationTokenSource();
            autoRunningTask = Task.Run(async () =>
            {
                while (!autoRunningCancellationToken.IsCancellationRequested)
                {
                    await SendOneManualFakeTicker();
                    await Task.Delay(20);
                }
            });
        }


        public async Task SendOneManualFakeTicker()
        {
            var currentTicker = tickerRepository.GetNextTicker();
            var flipPoint = rand.Next(0, 100);
            if (flipPoint > 50)
            {
                currentTicker.Price += currentTicker.Price/ 30;
            }
            else
            {
                currentTicker.Price -= currentTicker.Price / 30;
            }
            tickerRepository.StoreTicker(currentTicker);
            await SendRandomTicker(currentTicker);
        }


        private Task SendRandomTicker(TickerDto tickerInfo)
        {
            if (contextHolder.TickerHubClients == null) return Task.FromResult(false);

            Log.InfoFormat("Broadcast new trade to blotters: {0}", tickerInfo);
            return contextHolder.TickerHubClients.Group(TickerHub.TickerGroupName).SendTickers(new[] { tickerInfo });
        }
    }
}

The only part of this code that may need some explaination is the SendRandomTicker(TickerDto tickerInfo) method.

private Task SendRandomTicker(TickerDto tickerInfo)
{
    if (contextHolder.TickerHubClients == null) return Task.FromResult(false);

    Log.InfoFormat("Broadcast new trade to blotters: {0}", tickerInfo);
    return contextHolder.TickerHubClients.Group(TickerHub.TickerGroupName).SendTickers(new[] { tickerInfo });
}

This method makes use the TickerHub context that was captured within the TickerHub itself. By capturing the context, it allows external classes from the TickerHub to make use of its properties/values. That is precisely what the TickerHubPublisher does. It simply uses the captured TickerHub context to push out new values to subscribed clients for a particular topic. Which in this case is the value of the TickerHub.TickerGroupName ("AllTickers") string. So that is how the SignalR server is able to push notifications to the clients

Simulating A Crash In The Publisher

This bit is easy, just use the "Stop SignalR" button in the MainWindow. This will simply run the following code, which disposes the SignalR disposable returned from the SignalR WebApp.Start(..) method

private void StopServer()
{
    if (signalr != null)
    {
        tickerHubPublisher.Stop();
        signalr.Dispose();
        signalr = null;
    }

}

What should happen, is that if you had any connected clients they should (after a short period) see the server as unavailable and should display "DISCONNECTED" tiles

Restarting The Publisher

This bit is easy, just use the "Start SignalR" button in the MainWindow. This will simply run the following code

private void StartServer()
{
    try
    {
        signalr = WebApp.Start(Address);
        tickerHubPublisher.Start();
    }
    catch (Exception exception)
    {
        Log.Error("An error occurred while starting SignalR", exception);
    }
}

What should happen is that if you had any connected clients they should (after a short period) see the server as available again and no longer display the "DISCONNECTED" tiles

SignalR .NET Client

The client is a standard WPF app (I am using WPF just because I know it, it is not really that important, you could use Winforms using the MVP pattern just as well). The client also makes use of a SignalR proxy class, and as such a good starting place would be to examine the SignalR .NET client APIs, which you can read more about here:

http://www.asp.net/signalr/overview/guide-to-the-api/hubs-api-guide-net-client

IOC

There is some IOC used, AutoFac is the container used. As IOC is not the main point of the article, I will not waste any more time on it. If you want to know more about the IOC/AutoFac read the code, looking into the BootStrapper class

Resilient Connection

At the core of the client is a class called ServiceClientBase. This class is the base class for any client side Hub proxy connection. So what exactly does this class give you?

Well lets have a look at its code for a start shall we:

using System;
using System.Reactive;
using System.Reactive.Linq;
using Client.Hub.Transport;
using Common.Extensions;


namespace Client.Hub
{
    internal class ServiceClientBase
    {
        private readonly IConnectionProvider _connectionProvider;

        protected ServiceClientBase(IConnectionProvider connectionProvider)
        {
            _connectionProvider = connectionProvider;
        }

        protected IObservable<T> GetResilientStream<T>(Func<IConnection, IObservable<T>> streamFactory, 
            TimeSpan connectionTimeout)
        {
            var activeConnections = (from connection in _connectionProvider.GetActiveConnection()
                                     from status in connection.StatusStream
                                     where status.ConnectionStatus == ConnectionStatus.Connected || 
                                        status.ConnectionStatus == ConnectionStatus.Reconnected
                                     select connection)
                .Publish()
                .RefCount();

            // get the first connection
            var firstConnection = activeConnections.Take(1).Timeout(connectionTimeout);

            // 1 - notifies when the first connection gets disconnected
            var firstDisconnection = from connection in firstConnection
                                     from status in connection.StatusStream
                                     where status.ConnectionStatus == ConnectionStatus.Reconnecting || 
                                        status.ConnectionStatus == ConnectionStatus.Closed
                                     select Unit.Default;

            // 2- connection provider created a new connection it means the active one has droped
            var subsequentConnection = activeConnections.Skip(1).Select(_ => Unit.Default).Take(1);

            // OnError when we get 1 or 2
            var disconnected = firstDisconnection.Merge(subsequentConnection)
                .Select(_ => Notification.CreateOnError<T>(new Exception("Connection was closed.")))
                .Dematerialize();

            // create a stream which will OnError as soon as the connection drops
            return (from connection in firstConnection
                    from t in streamFactory(connection)
                    select t)
                .Merge(disconnected)
                .Publish()
                .RefCount();
        }
    }

}

There is an awful lot going on in this class. As I say THIS class is a core class. So what exactly does it do?

Well it does a few things

  1. First and foremost it provides an IObservable<T> based on the factory which produces the final IObservable<T> that will be yielded. This factory will be the unique source stream (IObservable<T>) that needs to be made resilient.
  2. It uses a connection status stream (which internally makes use of the SignalR events (such as connected/disconnected) to create the status stream) to merge into the overall stream. When a "Reconnection" or "Closed" status stream result (OnNext value) is seen, we consider that to be a disconnection, and we synthesize an OnError notifcation (by using the standard Rx Dematerialize operator) that is merged with the actual facory streams value, to give an overall stream result, which could potentially now yield OnNext/OnError to its consumers
  3. We also use the standard RX Publish operator, we do this to share the same underlying IObservable<T> sequence
  4. We also use the standard RX RefCount operator, such that when there is no subscribers the sequence will be disposed

This class makes use of a couple of extra uiltity classes, which we shall look at next

ConnectionProvider

The ConnectionProvider class has but one job to do, which is to return a IObservable<IConnection>, where an IConnection looks like this.:

internal interface IConnection
{
    IObservable<ConnectionInfo> StatusStream { get; }
    IObservable<Unit> Initialize();
    string Address { get; }
    void SetAuthToken(string authToken);
    IHubProxy TickerHubProxy { get; }

}

The more eagle eyed amongst you will spot the use of Unit, that is a kind of way of saying we do not care what the result is just that there is one.

Here the relevant code for the ConnectionProvider class:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;
using Common.Extensions;


namespace Client.Hub.Transport
{
    /// <summary>
    /// Connection provider provides always the same connection until it fails then 
    /// create a new one a yield it Connection provider randomizes the list of server 
    /// specified in configuration and then round robin through the list
    /// </summary>
    internal class ConnectionProvider : IConnectionProvider, IDisposable
    {
        private readonly SingleAssignmentDisposable disposable = new SingleAssignmentDisposable();
        private readonly string username;
        private readonly IObservable<IConnection> connectionSequence;
        private readonly string server;
        private int _currentIndex;
        private static readonly ILog log = LogManager.GetLogger(typeof(ConnectionProvider));

        public ConnectionProvider(string username, string server)
        {
            this.username = username;
            this.server = server;
            connectionSequence = CreateConnectionSequence();
        }

        public IObservable<IConnection> GetActiveConnection()
        {
            return connectionSequence;
        }

        public void Dispose()
        {
            disposable.Dispose();
        }

        private IObservable<IConnection> CreateConnectionSequence()
        {
            return Observable.Create<IConnection>(o =>
            {
                log.Info("Creating new connection...");
                var connection = GetNextConnection();

                var statusSubscription = connection.StatusStream.Subscribe(
                    _ => { },
                    ex => o.OnCompleted(),
                    () =>
                    {
                        log.Info("Status subscription completed");
                        o.OnCompleted();
                    });

                var connectionSubscription =
                    connection.Initialize().Subscribe(
                        _ => o.OnNext(connection),
                        ex => o.OnCompleted(),
                        o.OnCompleted);

                return new CompositeDisposable { statusSubscription, connectionSubscription };
            })
            .Repeat()
            .Replay(1)
            .LazilyConnect(disposable);
        }

        private IConnection GetNextConnection()
        {
            return new Client.Hub.Transport.Connection(server, username);
        }
    }

}

Connection

The Connection itself exposes a SignalR proxy and also a IObservable<ConnectInfo> which is creating by examining all the available SignalR .NET client events. Consumers of this class will now have acess to a stream for the connections, such that they may be able to find out when a connection is lost or reconnected.

Here is this class:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Common;
using Common.Extensions;
using log4net;
using Microsoft.AspNet.SignalR.Client;

namespace Client.Hub.Transport
{
    internal class Connection : IConnection
    {
        private readonly ISubject<ConnectionInfo> _statusStream;
        private readonly HubConnection hubConnection;

        private bool _initialized;
        private static readonly ILog log = LogManager.GetLogger(typeof(Connection));

        public Connection(string address, string username)
        {
            _statusStream = new BehaviorSubject<ConnectionInfo>(
                new ConnectionInfo(ConnectionStatus.Uninitialized, address));
            Address = address;
            hubConnection = new HubConnection(address);
            //hubConnection.Headers.Add(ServiceConstants.Server.UsernameHeader, username);
            CreateStatus().Subscribe(
                s => _statusStream.OnNext(new ConnectionInfo(s, address)),
                _statusStream.OnError,
                _statusStream.OnCompleted);
            hubConnection.Error += exception => log.Error("There was a connection error with " 
                + address, exception);

            TickerHubProxy = hubConnection.CreateHubProxy(ServiceConstants.Server.TickerHub);

        }

        public IObservable<Unit> Initialize()
        {
            if (_initialized)
            {
                throw new InvalidOperationException("Connection has already been initialized");
            }
            _initialized = true;

            return Observable.Create<Unit>(async observer =>
            {
                _statusStream.OnNext(new ConnectionInfo(ConnectionStatus.Connecting, Address));

                try
                {
                    log.InfoFormat("Connecting to {0}", Address);
                    await hubConnection.Start();
                    _statusStream.OnNext(new ConnectionInfo(ConnectionStatus.Connected, Address));
                    observer.OnNext(Unit.Default);
                }
                catch (Exception e)
                {
                    log.Error("An error occurred when starting SignalR connection", e);
                    observer.OnError(e);
                }

                return Disposable.Create(() =>
                {
                    try
                    {
                        log.Info("Stoping connection...");
                        hubConnection.Stop();
                        log.Info("Connection stopped");
                    }
                    catch (Exception e)
                    {
                        // we must never throw in a disposable
                        log.Error("An error occurred while stoping connection", e);
                    }
                });
            })
            .Publish()
            .RefCount();
        }

        private IObservable<ConnectionStatus> CreateStatus()
        {
            var closed = Observable.FromEvent(h => hubConnection.Closed += h, 
                h => hubConnection.Closed -= h).Select(_ => ConnectionStatus.Closed);
            var connectionSlow = Observable.FromEvent(h => hubConnection.ConnectionSlow += h, 
                h => hubConnection.ConnectionSlow -= h).Select(_ => ConnectionStatus.ConnectionSlow);
            var reconnected = Observable.FromEvent(h => hubConnection.Reconnected += h, 
                h => hubConnection.Reconnected -= h).Select(_ => ConnectionStatus.Reconnected);
            var reconnecting = Observable.FromEvent(h => hubConnection.Reconnecting += h, 
                h => hubConnection.Reconnecting -= h).Select(_ => ConnectionStatus.Reconnecting);
            return Observable.Merge(closed, connectionSlow, reconnected, reconnecting)
                .TakeUntilInclusive(status => status == ConnectionStatus.Closed); 
            // complete when the connection is closed (it's terminal, SignalR will not attempt to reconnect anymore)
        }

        public IObservable<ConnectionInfo> StatusStream
        {
            get { return _statusStream; }
        }

        public string Address { get; private set; }

        public IHubProxy TickerHubProxy { get; private set; }

        public void SetAuthToken(string authToken)
        {
            //hubConnection.Headers[AuthTokenProvider.AuthTokenKey] = authToken;
        }

        public override string ToString()
        {
            return string.Format("Address: {0}", Address);
        }
    }

}

SignalR Subscriptions / Tickers Stream

As we saw in the publisher, we know that a client to a particular server side SignalR hub will call 2 things, namely SubscribeTickers and UnsubscribeTickers. And we also know there is a streaming API of TickerDto (At least there is for this demo app, which only contains a single hub). Where the following is true:

  • The client will call the server side hub SubscribeTickers on connection
  • The client will call the server side hub UnsubscribeTickers on disposal (this is done by only calling this method on disposal, see the following code where we use the power of Observable.Create(..) along with some CompositeDisposable to dispose of a few things, one of which is an Action which we create using Disposable.Create(..) where that Action will call the UnsubscribeTickers on disposal.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Hub.Transport;
using Common;
using log4net;
using Microsoft.AspNet.SignalR.Client;

namespace Client.Hub
{
    internal class TickerHubClient : ServiceClientBase, ITickerHubClient
    {

        private static readonly ILog log = LogManager.GetLogger(typeof(TickerHubClient));

        public TickerHubClient(IConnectionProvider connectionProvider)
            : base(connectionProvider)
        {
        }

        public IObservable<IEnumerable<TickerDto>> GetTickerStream()
        {
            return GetResilientStream(connection => GetTradesForConnection(connection.TickerHubProxy), 
                TimeSpan.FromSeconds(5));
        }

        private IObservable<IEnumerable<TickerDto>> GetTradesForConnection(IHubProxy tickerHubProxy)
        {
            return Observable.Create<IEnumerable<TickerDto>>(observer =>
            {
                // subscribe to trade feed first, otherwise there is a race condition 
                var spotTradeSubscription = tickerHubProxy.On<IEnumerable<TickerDto>>(
                    ServiceConstants.Client.SendTickers, observer.OnNext);

                var spotTradeSubscriptionRaceDisposable = 
                    tickerHubProxy.On<IEnumerable<TickerDto>>(
                    ServiceConstants.Client.SendTickers, 
                    (x) => 
                    {
                            Console.WriteLine("Got a new trade" + x.First().Name);
                    });



                log.Info("Sending ticker subscription...");
                var sendSubscriptionDisposable = SendSubscription(tickerHubProxy)
                    .Subscribe(
                        _ => log.InfoFormat("Subscribed to ticker."),
                        observer.OnError);

                var unsubscriptionDisposable = Disposable.Create(() =>
                {
                    // send unsubscription when the observable gets disposed
                    log.Info("Sending ticker unsubscription...");
                    SendUnsubscription(tickerHubProxy)
                        .Subscribe(
                            _ => log.InfoFormat("Unsubscribed from ticker."),
                            ex => log.WarnFormat(
                            	"An error occurred while unsubscribing from ticker: {0}", 
                            	ex.Message));
                });
                return new CompositeDisposable { 
                    spotTradeSubscription, unsubscriptionDisposable, 
                    sendSubscriptionDisposable,spotTradeSubscriptionRaceDisposable 
                };
            })
            .Publish()
            .RefCount();
        }

        private static IObservable<Unit> SendSubscription(IHubProxy tickerHubProxy)
        {
            return Observable.FromAsync(() => tickerHubProxy.Invoke(
                ServiceConstants.Server.SubscribeTickers));
        }

        private static IObservable<Unit> SendUnsubscription(IHubProxy tickerrHubProxy)
        {
            return Observable.FromAsync(() => tickerrHubProxy.Invoke(
                ServiceConstants.Server.UnsubscribeTickers));

        }
    }
}

The other thing to note about this code, is that it inherits from ServiceClientBase, and as such has the resilient connection functionality that we discussed above. Where we feed into the ServiceClientBase class the GetTradesForConnection(IHubProxy tickerHubProxy) method to act as the IObservable<T> factory that is used in the final (now resilient thanks to the logic in the ServiceClientBase base class) GetTickerStream stream.

From here the GetTickerStream stream is free for anyone to use. So lets see who uses it, that is discussed just below.

TickerRepository

The TickerRepository is the next rung up the Observable chain. So what does that look like. Well it is actually suprisingly simple, but don't be fooled by that, there is a LOT going on here.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using Client.Factory;
using Client.Hub;

namespace Client.Repositories
{
    class TickerRepository : ITickerRepository
    {
        private readonly ITickerHubClient tickerHubClient;
        private readonly ITickerFactory tickerFactory;

        public TickerRepository(ITickerHubClient tickerHubClient, ITickerFactory tickerFactory)
        {
            this.tickerHubClient = tickerHubClient;
            this.tickerFactory = tickerFactory;
        }

        public IObservable<IEnumerable<Ticker>> GetTickerStream()
        {
            return Observable.Defer(() => tickerHubClient.GetTickerStream())
                .Select(trades => trades.Select(tickerFactory.Create))
                .Catch(Observable.Return(new Ticker[0]))
                .Repeat()
                .Publish()
                .RefCount();
        }
    }
}

So what is going on here exactly?

  • We use Observable.Defer such that we do not actually make use of the underlying stream, until someone subscribes to the IObservable created by using Observable.Defer. It is a way of making  a hot stream cold.
  • We use select to transform the strem data from TickerDto to Ticker
  • We use Catch to catch any Exceptions (OnError) in the stream. Where we use a default value for that case
  • We use Repeat. Now this one is VERY VERY important. This is the one that allows us to repeat the whole stream, including connecting to the server side hub again. It is this mechanism that allows the client to recover from a server side loss of SignalR hub. This along with the resilient stream logic are the MOST important bits to the app (at least in my opinion)
  • We use Publish to share the underlying stream
  • We use RefCount to get automatic disposal when there are no longer an subscribers

So now that we have seen this repository, we only have one more hop to continue the IObservable journey for the TickerStream. Let see that next.

TickersViewModel

The TickersViewModel represents ALL the Ticker(s) you see on the screen. It is this viewmodel that makes use of the lazy / repeatable  / resilient IObservable that the TickerRepository provides. Lets see the code it is pretty self explanatory, I think:

using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Client.Factory;
using Client.Repositories;
using Client.Services;
using Common;
using Common.ViewModels;
using log4net;


namespace Client.ViewModels
{
    public class TickersViewModel : INPCBase
    {
        private readonly ITickerRepository tickerRepository;
        private readonly IConcurrencyService concurrencyService;
        private bool stale = false;
        private static readonly ILog log = LogManager.GetLogger(typeof(TickersViewModel));

        public TickersViewModel(IReactiveTrader reactiveTrader,
                                IConcurrencyService concurrencyService,
            TickerViewModelFactory tickerViewModelFactory)
        {
            Tickers = new ObservableCollection<TickerViewModel>();
            Tickers.Add(tickerViewModelFactory.Create("Yahoo"));
            Tickers.Add(tickerViewModelFactory.Create("Google"));
            Tickers.Add(tickerViewModelFactory.Create("Apple"));
            Tickers.Add(tickerViewModelFactory.Create("Facebook"));
            Tickers.Add(tickerViewModelFactory.Create("Microsoft"));
            Tickers.Add(tickerViewModelFactory.Create("Twitter"));
            this.tickerRepository = reactiveTrader.TickerRepository;
            this.concurrencyService = concurrencyService;
            LoadTrades();
        }


        public ObservableCollection<TickerViewModel> Tickers { get; private set; }

        private void LoadTrades()
        {
            tickerRepository.GetTickerStream()
                            .ObserveOn(concurrencyService.Dispatcher)
                            .SubscribeOn(concurrencyService.TaskPool)
                            .Subscribe(
                                AddTickers,
                                ex => log.Error("An error occurred within the trade stream", ex));
        }

        private void AddTickers(IEnumerable<Ticker> incomingTickers)
        {
            var allTickers = incomingTickers as IList<Ticker> ?? incomingTickers.ToList();
            if (!allTickers.Any())
            {
                // empty list of trades means we are disconnected
                stale = true;
            }
            else
            {
                if (stale)
                {
                    stale = false;
                }
            }

            foreach (var ticker in allTickers)
            {
                Tickers.Single(x => x.Name == ticker.Name)
                    .AcceptNewPrice(ticker.Price);
            }
        }
    }
}}

Where each Ticker is represented by a single TickerViewModel which is as shown below. It can be seen that this class also makes use of the ConnectionStatusStream IObservable we discussed earlier. This is used to change the TickerViewModels view to show a red box with "DISCONNECTED" in it. We will talk through that in just a minute.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Hub.Transport;
using Client.Services;
using log4net;

namespace Client.ViewModels
{
    public class TickerViewModel : INPCBase
    {
        private decimal price;
        private bool isUp;
        private bool stale;
        private bool disconnected;
        private static readonly ILog log = LogManager.GetLogger(typeof(TickerViewModel));


        public TickerViewModel(
            IReactiveTrader reactiveTrader,
            IConcurrencyService concurrencyService,
            string name)
        {

            this.Name = name;

            reactiveTrader.ConnectionStatusStream
                .ObserveOn(concurrencyService.Dispatcher)
                .SubscribeOn(concurrencyService.TaskPool)
                .Subscribe(
                    OnStatusChange,
                    ex => log.Error("An error occurred within the connection status stream.", ex));
        }


        public string Name { get; private set; }


        public void AcceptNewPrice(decimal newPrice)
        {
            IsUp = newPrice > price;
            Price = newPrice;
        }


        public decimal Price
        {
            get { return this.price; }
            private set
            {
                this.price = value;
                base.OnPropertyChanged("Price");
            }
        }

        public bool IsUp
        {
            get { return this.isUp; }
            private set
            {
                this.isUp = value;
                base.OnPropertyChanged("IsUp");
            }
        }

        public bool Stale
        {
            get { return this.stale; }
            set
            {
                this.stale = value;
                base.OnPropertyChanged("Stale");
            }
        } 
        
        public bool Disconnected
        {
            get { return this.disconnected; }
            set
            {
                this.disconnected = value;
                base.OnPropertyChanged("Disconnected");
            }
        }


        private void OnStatusChange(ConnectionInfo connectionInfo)
        {

            switch (connectionInfo.ConnectionStatus)
            {
                case ConnectionStatus.Uninitialized:
                case ConnectionStatus.Connecting:
                    Disconnected = true;
                    break;
                case ConnectionStatus.Reconnected:
                case ConnectionStatus.Connected:
                    Disconnected = false;
                    break;
                case ConnectionStatus.ConnectionSlow:
                    Disconnected = false;
                    break;
                case ConnectionStatus.Reconnecting:
                    Disconnected = true;
                    break;
                case ConnectionStatus.Closed:
                    Disconnected = true;
                    break;
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }

    }
}

Connection Status Stream

The last thing I wanted to show you was how the ConnectionStatusStream was used. This stream OnNexts when the client side SignalR hub proxy events are raised. So we see thing like Connecting, Connected,ConnectionSlow, Reconnecting,Reconnected,Closed,Uninitialized. All of which started out life as events on the SignalR hub proxy, and are turned in to an IObservable stream using one of the many RX factories. In this case IObservable.FromEvent.

Anyway here is the overall ConnectivityStatusViewModel that we use to show the information in the bottom status bar of the app.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Hub.Transport;
using Client.Services;
using log4net;

namespace Client.ViewModels
{
    public class ConnectivityStatusViewModel : INPCBase
    {
        private static readonly ILog log = LogManager.GetLogger(typeof(ConnectivityStatusViewModel));
        private string server;
        private string status;
        private bool disconnected;

        public ConnectivityStatusViewModel(
            IReactiveTrader reactiveTrader,
            IConcurrencyService concurrencyService)
        {
            reactiveTrader.ConnectionStatusStream
                .ObserveOn(concurrencyService.Dispatcher)
                .SubscribeOn(concurrencyService.TaskPool)
                .Subscribe(
                OnStatusChange,
                ex => log.Error("An error occurred within the connection status stream.", ex));
        }


        private void OnStatusChange(ConnectionInfo connectionInfo)
        {
            Server = connectionInfo.Server;

            switch (connectionInfo.ConnectionStatus)
            {
                case ConnectionStatus.Uninitialized:
                case ConnectionStatus.Connecting:
                    Status = "Connecting...";
                    Disconnected = true;
                    break;
                case ConnectionStatus.Reconnected:
                case ConnectionStatus.Connected:
                    Status = "Connected";
                    Disconnected = false;
                    break;
                case ConnectionStatus.ConnectionSlow:
                    Status = "Slow connection detected";
                    Disconnected = false;
                    break;
                case ConnectionStatus.Reconnecting:
                    Status = "Reconnecting...";
                    Disconnected = true;
                    break;
                case ConnectionStatus.Closed:
                    Status = "Disconnected";
                    Disconnected = true;
                    break;
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }

        public string Server
        {
            get { return this.server; }
            set
            {
                this.server = value;
                base.OnPropertyChanged("Server");
            }
        }

        public string Status
        {
            get { return this.status; }
            set
            {
                this.status = value;
                base.OnPropertyChanged("Status");
            }
        }

        public bool Disconnected
        {
            get { return this.disconnected; }
            set
            {
                this.disconnected = value;
                base.OnPropertyChanged("Disconnected");
            }
        }
    }

}

That's It For Now

Well there we go, I hope I have shown you all that RX is actually way more than just LINQ to events, it contains a great many classes for dealing with concurrency, LINQ, events, streams, factories, retries, error handling, disposing of resources.

Some of you may be pleased to know this is a 2 part article, in the next part I will be teaming up with the author of the .NET port of the highly regarded socket library ZeroMQ. The .NET port is called NetMQ, and together with its author (Ok I did contribute a bit to that library too) we will be showing you how you can build this sort of app using NetMQ. So until then, if you have enjoyed this article, please feel free to leave a comment/vote they are most welcome

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here