You can grab the code from my github repository right here
https://github.com/sachabarber/SignalrRxDemo
I work in finance, and at the moment work in the Foreign Exchange (FX) space, where we have lots of requirements that involve streaming of various things, such as:
- Rates
- Trades
- Other financial instruments
Historically what we would have done to accomodate these requirements was to use sockets and push our own data (usually JSON or XML) down the wire, and then had some quite knarly listeners above that, where each listener would want some slightly different view of the original streaming data.
Thing is, time has never stands still in the world of software development, and new things come out all the time, literally every other day a new framework comes along that helps improve what came before it in some way or another.
One particular thing, that I think a lot of people actually misunderstood was Reactive Extensions RX, which in fairness has been around a while now. I think a lot of people think of RX as LINQ to events, which it does have. However the truth is that RX is a superb framework for creating streaming applications and the more you get into RX the more you tend to see everything as a stream, from and event, down to picking a file or a ICommand.Execute
in a ViewModel say.
RX comes with many many tools in it's arsenal, such as
- 2 core interfaces
IObservable
/ IObserver
- Lots of LINQ like extension methods for
IObservable
- Time / window based operations (LINQ doesn't really have these)
- Concurrency schedulers
- Stream error handling controls (Catch etc etc)
- Ability to create
IObservable
from a great many other things, such as
IEnumerable
Task
- Events
IObservable
Factories
I personally feel that a lot of people could make use of RX to great gain, if they were to put in the time to learn about it, a bit more.
There is also another great library from Microsoft which facilitates push based technology, namely SignalR. Quite a lot has been written about SignalR, but you rarely see that from a desktop point of view.
Most people would probably associate a SignalR Hub with a web site, but there are various ways you can use SignalR, which also make it a good choice for desktop (ie non web) development. This is possible thanks to the SignalR OWIN hosting API, such that you can self host a SignalR Hub. There is also a .NET client, we can use to communicate with the hub. SignalR also has quite a lot of events within the .NET client, and guess what we can do with those events, yep that's right we can create IObservable
(s) streams out of them, which opens up the gates to use RX.
This article will show one area where RX excels, which is in the area of streaming APIs, where we will be using RX/SignalR
- Streaming API using SignalR and RX (this article)
- Streaming API using NetMQ and RX
I think the best way to see what this demo is about is to have a look at the video, which I have uploaded to YouTube here:
https://www.youtube.com/watch?v=TDIpPvbw6ek
There are many many RX Extension methods which you can make use of:
http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable_methods(v=vs.103).aspx
But perhaps the best place to start with RX, is to understand the underlying 2 interfaces you will use time and time again. These are IObservable
and IObserver
. We discuss these 2 next.
namespace System
{
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
}
Defines a provider for push-based notification.
This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.
Subscribe
Notifies the provider that an observer is to receive notifications.
Further reading: http://msdn.microsoft.com/en-us/library/dd990377(v=vs.110).aspx
namespace System
{
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
}
The IObserver<T>
and IObservable<T>
interfaces provide a generalized mechanism for push-based notification, also known as the observer design pattern. The IObservable<T>
interface represents the class that sends notifications (the provider); the IObserver<T>
interface represents the class that receives them (the observer). T
represents the class that provides the notification information.
An IObserver<T>
implementation arranges to receive notifications from a provider (an IObservable<T>
implementation) by passing an instance of itself to the provider's IObservable<T>.Subscribe
method. This method returns an IDisposable
object that can be used to unsubscribe the observer before the provider finishes sending notifications.
The IObserver<T>
interface defines the following three methods that the observer must implement:
- The
OnNext
method, which is typically called by the provider to supply the observer with new data or state information.
- The
OnError
method, which is typically called by the provider to indicate that data is unavailable, inaccessible, or corrupted, or that the provider has experienced some other error condition.
- The
OnCompleted
method, which is typically called by the provider to indicate that it has finished sending notifications to observers.
Further reading : http://msdn.microsoft.com/en-us/library/dd783449(v=vs.110).aspx
Anyone that has used RX a bit, will come to the point that they may like to create their own operators. In LINQ this would be done by creating a new method (perhaps an extension method) that simple returns a new IEnumerable<T>
. In RX things are slightly trickier but not much trickier. In RX you would make use of Observable.Create
to create your own operators, or as a factory for creating new a Observable<T>
. Here is a simple example:
private IObservable<string> CreateObservable()
{
return Observable.Create<string>(
(IObserver<string> observer) =>
{
observer.OnNext("a");
observer.OnNext("b");
observer.OnCompleted();
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
});
}
Observable.Create
creates an observable sequence from a subscribe method implementation. You would typically use an extension method to an existing IObservable<T>
and then put an Observable.Create(..)
as the body of the extension method.
Observable.Create
has this method signature
public static IObservable<TSource> Create<TSource>(
Func<IObserver<TSource>, IDisposable> subscribe
)
The interesting part of this method signature is that you can see that it returns a Func
delegate hat takes a IObserver<TSource>
which allows you to push values into the observer using its OnNext/OnError
and OnCompleted
methods. The Func
delegate returns a IDisposable
, which will called when disposal happens. Now RX comes with a great many different IDisposable
types, don't be suprised if you see some quite exotic IDisposable
types being used inside of an Observable.Create
implementation.
One of the best descriptions of using Observable.Create
is from Lee Campbells site :
http://www.introtorx.com/content/v1.0.10621.0/04_CreatingObservableSequences.html
I had the pleasure of going on a 2 day Rx course with Lee Campbell the author of like the only Rx book on the planet, which Lee has a blog dedicated to, right here : http://www.introtorx.com/
Lee now works for a consultancy called "Adaptive" who were the ones that actually ran the training course I attended. They have also released into the wild a truly great demo app which showcases how to create a full connected fault tolerant streaming API + client app. They have done several flavours of the demo:
- WPF
- Windows Store (WinRT)
- iOS using Monotouch
- HTML5 (using TypeScript)
You can grab the demo app here https://github.com/AdaptiveConsulting/ReactiveTrader
Thing is this solution is reasonably large, and if you are new to Rx or are just trying to get your head around how to create a reliable streaming API, this solution is a tad over bearing, at least in my opinion.
To be fair I think the demo app the Adaptive crew have put together serves as more of a marketing tool for the collective minds of Adaptive, people would look at it and go, mmm these guys seem to know their onions let's get them in. However if you are a mere developer trying to get your head into that space, you may be looking for something a little smaller to start with, you know kind of baby steps.
I am hoping that is where this demo article will fit the bill, as I have been through the pain of trying to understand the WPF version of the https://github.com/AdaptiveConsulting/ReactiveTrader demo app, and will present a very much cut down version in this article for you.
This section outlines a bare bones demo app, which as I say is largely based on the great work done by the guys at Adaptive, and their https://github.com/AdaptiveConsulting/ReactiveTrader demo app. What I wanted to achieve was quite simple:
- A single SignalR hub (the Adative example has a few) that would push out
TickerDto
objects
- Clients that would subscribe to a certain topic to receive push notifications from the SignalR server side hub, and push this out via RX, such that anyone interested may use the standard RX operators
- The ability to simulate the server dying, and coming back to life
- The ability for the connected clients to know when the server was up/down
- The ability for the connection between the client and server to be resilient, in that when the server comes back up the client should be able to recover from that
The Adaptive demo does all this (and more), but I am not Lee Campbell and do not have his depth of experience with RX, though I have to say I think I am getting it now. So that is why I wrote up this small demo app
This diagram illustrates the general idea:
The publisher is made up of a few bits and pieces, which are described below, one thing to note is that both the client and the server make use of SignalR, so it may be an idea to read about the SignalR self hosting, here:
http://www.asp.net/signalr/overview/guide-to-the-api/hubs-api-guide-server
There is some IOC used, AutoFac is the container used. As IOC is not the main point of the article, I will not waste any more time on it. If you want to know more about the IOC/AutoFac read the code, looking into the BootStrapper
class
There is not much to say about this window, except that is has the following controls
- Start SignalR : Which starts the SignalR Hub (useful for simulating crash/restart of hub)
- Stop SignalR : Which stops the SignalR Hub (useful for simulating crash/restart of hub)
- Start Auto Ticker Data : Will push out random
TickerDto
objects out of the Hub context at fixed intervals
- Stop Auto Ticker Data : Will suspend the sending of
TickerDto
objects out of the Hub
- Send One Ticker : Will push 1 SINGLE random
TickerDto
object out of the Hub context
The MainWindowViewModel
is really used to facilitate sending commands to either stop/create the SignalR Hub, or to tell the TickerHubPublisher
to do things. Here is the full code for the MainWindowViewModel
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Common.ViewModels;
using log4net;
using Microsoft.Owin.BuilderProperties;
using Microsoft.Owin.Hosting;
using SignalRSelfHost.Hubs.Ticker;
namespace SignalRSelfHost
{
public class MainWindowViewModel : IMainWindowViewModel
{
private const string Address = "http://localhost:5263";
private readonly ITickerHubPublisher tickerHubPublisher;
private static readonly ILog Log = LogManager.GetLogger(typeof(MainWindowViewModel));
private IDisposable signalr;
public MainWindowViewModel(ITickerHubPublisher tickerHubPublisher)
{
this.tickerHubPublisher = tickerHubPublisher;
AutoTickerStartCommand = new DelegateCommand(tickerHubPublisher.Start);
AutoTickerStopCommand = new DelegateCommand(tickerHubPublisher.Stop);
SendOneTickerCommand = new DelegateCommand(async () =>
{
await tickerHubPublisher.SendOneManualFakeTicker();
});
StartCommand = new DelegateCommand(StartServer);
StopCommand = new DelegateCommand(StopServer);
}
public ICommand AutoTickerStartCommand { get; set; }
public ICommand AutoTickerStopCommand { get; set; }
public ICommand SendOneTickerCommand { get; set; }
public ICommand StartCommand { get; private set; }
public ICommand StopCommand { get; private set; }
public void Start()
{
StartServer();
}
private void StartServer()
{
try
{
signalr = WebApp.Start(Address);
}
catch (Exception exception)
{
Log.Error("An error occurred while starting SignalR", exception);
}
}
private void StopServer()
{
if (signalr != null)
{
signalr.Dispose();
signalr = null;
}
}
}
}
As you can see not much really goes on in the MainWindowViewModel
, all the action happens elsewhere.
In order to start the publisher, the very first consideration we need to deal with is how to host the SignalR hub. This is handled by an application scope OwinStartupAttribute
, which you will find in the demo app within the App
class
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using Autofac;
using Microsoft.Owin;
using SignalRSelfHost.Hubs;
using SignalRSelfHost.Hubs.Ticker;
using SignalRSelfHost.IOC;
using log4net;
[assembly: OwinStartup(typeof(Startup))]
namespace SignalRSelfHost
{
public partial class App : Application
{
....
....
....
....
}
}
The OwinStartupAttribute
points to a class (Startup
in this case) that configures the Owin SignalR self hosting. Lets have a look at that shall we.
using Microsoft.AspNet.SignalR;
using Microsoft.Owin.Cors;
using Owin;
using SignalRSelfHost.IOC;
namespace SignalRSelfHost.Hubs
{
public class Startup
{
public void Configuration(IAppBuilder app)
{
app.Map("/signalr", map =>
{
map.UseCors(CorsOptions.AllowAll);
var hubConfiguration = new HubConfiguration
{
Resolver = new AutofacSignalRDependencyResolver(App.Container),
};
map.RunSignalR(hubConfiguration);
});
}
}
}
So that is how the hub gets started. But how does the hub know about clients, and know how to push out to them? We will look at that next
The TickerHub
is the only hub in the application, and it has 2 main methods. One to allow subscriptions from clients, and one to remove a clients subscription. Here is the code for the hub, Don't worry we will be looking into these 2 methods in a bit more detail in a minute
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Common;
using log4net;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Hubs;
namespace SignalRSelfHost.Hubs.Ticker
{
[HubName(ServiceConstants.Server.TickerHub)]
public class TickerHub : Hub
{
private readonly ITickerRepository tickerRepository;
private readonly IContextHolder contextHolder;
public const string TickerGroupName = "AllTickers";
private static readonly ILog Log = LogManager.GetLogger(typeof(TickerHub));
public TickerHub(ITickerRepository tickerRepository, IContextHolder contextHolder)
{
this.tickerRepository = tickerRepository;
this.contextHolder = contextHolder;
}
[HubMethodName(ServiceConstants.Server.SubscribeTickers)]
public async Task SubscribeTrades()
{
contextHolder.TickerHubClients = Clients;
var user = ContextUtil.GetUserName(Context);
Log.InfoFormat("Received trade subscription from user {0}", user);
await Groups.Add(Context.ConnectionId, TickerGroupName);
Log.InfoFormat("Connection {0} of user {1} added to group '{2}'", Context.ConnectionId, user, TickerGroupName);
var tickers = tickerRepository.GetAllTickers();
await Clients.Caller.SendTickers(tickers);
Log.InfoFormat("Snapshot published to {0}", Context.ConnectionId);
}
[HubMethodName(ServiceConstants.Server.UnsubscribeTickers)]
public async Task UnsubscribeTrades()
{
Log.InfoFormat("Received unsubscription request for trades from connection {0}", Context.ConnectionId);
await Groups.Remove(Context.ConnectionId, TickerGroupName);
Log.InfoFormat("Connection {0} removed from group '{1}'", Context.ConnectionId, TickerGroupName);
}
}
}
This TickerHub
method is called via the SignalR clients, and will be used to register an interest in a topic, which SignalR calls "groups". Essentially what happens is, when a client launches, it uses it own SignalR proxy to communicate with the server side self hosted hub, and will call the server side hubs SubscribeTrades()
method. When that happens the clients connection is added to the server side hubs group, and the client is sent a one of push of all currently held tickers. This single push of all the current tickers, is done by pushing the entire contents of ALL the TickerDto
objects currently stored in the TickerRepository
, which is a simple in memory queued repository of the most recent ticker TickerDto
s produced.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Common;
namespace SignalRSelfHost.Hubs.Ticker
{
public class TickerRepository : ITickerRepository
{
private readonly Queue<TickerDto> tickers = new Queue<TickerDto>();
private object syncLock = new object();
private const int MaxTrades = 50;
public TickerRepository()
{
tickers.Enqueue(new TickerDto() {Name="Yahoo", Price=1.2m});
tickers.Enqueue(new TickerDto() {Name="Google", Price=1022m});
tickers.Enqueue(new TickerDto() {Name="Apple", Price=523m});
tickers.Enqueue(new TickerDto() {Name="Facebook", Price=49m});
tickers.Enqueue(new TickerDto() {Name="Microsoft", Price=37m});
tickers.Enqueue(new TickerDto() {Name="Twitter", Price=120m});
}
public TickerDto GetNextTicker()
{
return tickers.Dequeue();
}
public void StoreTicker(TickerDto tickerInfo)
{
lock (syncLock)
{
tickers.Enqueue(tickerInfo);
if (tickers.Count > MaxTrades)
{
tickers.Dequeue();
}
}
}
public IList<TickerDto> GetAllTickers()
{
IList<TickerDto> newTickers;
lock (syncLock)
{
newTickers = tickers.ToList();
}
return newTickers;
}
}
}
Any time a subsequent ticker is created (we will see how this happens soon) the client will get that pushed at them in real time. This bulk push is a one of, when the client 1st establishes comms with the server. The other thing that happens right at the start of this method is that the context is captured such that external classes from the TickerHub
, will have access to the TickerHub
context and its associated values
contextHolder.TickerHubClients = Clients;
This TickerHub
UnsubscribeTrades()
method is called via the SignalR clients, and will be used to un-register the clients interest in a topic (which SignalR calls "groups". Essentially what happens is, when a client shuts down it uses it own SignalR proxy to communicate with the server side self hosted hub, and will call the server side hubs UnsubscribeTrades()
method. When that happens the clients connection is removed from the server side hubs group.
Ok so we have now seen how SignalR clients register/unregister an interest in a topic (at least from a server side perspective), but how does the client get push notifications from the TickerHub
? Who does that? Well that job falls to another component. Namely the TickerHubPublisher
. The job of the TickerHubPublisher
in this demo app is quite simple, produce a random TickerDto
and push it to any connected clients, who are interested in a particular topic. The entire contents of the TickerHubPublisher
is shown below. I think the code is pretty self explanatory.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using Common;
using log4net;
using Microsoft.AspNet.SignalR;
namespace SignalRSelfHost.Hubs.Ticker
{
public class TickerHubPublisher : ITickerHubPublisher
{
private readonly IContextHolder contextHolder;
private readonly ITickerRepository tickerRepository;
Random rand = new Random();
private static readonly ILog Log = LogManager.GetLogger(typeof(TickerHubPublisher));
private CancellationTokenSource autoRunningCancellationToken;
private Task autoRunningTask;
public TickerHubPublisher(IContextHolder contextHolder,
ITickerRepository tickerRepository)
{
this.contextHolder = contextHolder;
this.tickerRepository = tickerRepository;
}
public void Stop()
{
if (autoRunningCancellationToken != null)
{
autoRunningCancellationToken.Cancel();
autoRunningTask.Wait();
autoRunningCancellationToken = null;
}
}
public async void Start()
{
autoRunningCancellationToken = new CancellationTokenSource();
autoRunningTask = Task.Run(async () =>
{
while (!autoRunningCancellationToken.IsCancellationRequested)
{
await SendOneManualFakeTicker();
await Task.Delay(20);
}
});
}
public async Task SendOneManualFakeTicker()
{
var currentTicker = tickerRepository.GetNextTicker();
var flipPoint = rand.Next(0, 100);
if (flipPoint > 50)
{
currentTicker.Price += currentTicker.Price/ 30;
}
else
{
currentTicker.Price -= currentTicker.Price / 30;
}
tickerRepository.StoreTicker(currentTicker);
await SendRandomTicker(currentTicker);
}
private Task SendRandomTicker(TickerDto tickerInfo)
{
if (contextHolder.TickerHubClients == null) return Task.FromResult(false);
Log.InfoFormat("Broadcast new trade to blotters: {0}", tickerInfo);
return contextHolder.TickerHubClients.Group(TickerHub.TickerGroupName).SendTickers(new[] { tickerInfo });
}
}
}
The only part of this code that may need some explaination is the SendRandomTicker(TickerDto tickerInfo)
method.
private Task SendRandomTicker(TickerDto tickerInfo)
{
if (contextHolder.TickerHubClients == null) return Task.FromResult(false);
Log.InfoFormat("Broadcast new trade to blotters: {0}", tickerInfo);
return contextHolder.TickerHubClients.Group(TickerHub.TickerGroupName).SendTickers(new[] { tickerInfo });
}
This method makes use the TickerHub
context that was captured within the TickerHub
itself. By capturing the context, it allows external classes from the TickerHub
to make use of its properties/values. That is precisely what the TickerHubPublisher
does. It simply uses the captured TickerHub
context to push out new values to subscribed clients for a particular topic. Which in this case is the value of the TickerHub.TickerGroupName
("AllTickers") string. So that is how the SignalR server is able to push notifications to the clients
This bit is easy, just use the "Stop SignalR" button in the MainWindow. This will simply run the following code, which disposes the SignalR disposable returned from the SignalR WebApp.Start(..)
method
private void StopServer()
{
if (signalr != null)
{
tickerHubPublisher.Stop();
signalr.Dispose();
signalr = null;
}
}
What should happen, is that if you had any connected clients they should (after a short period) see the server as unavailable and should display "DISCONNECTED" tiles
This bit is easy, just use the "Start SignalR" button in the MainWindow. This will simply run the following code
private void StartServer()
{
try
{
signalr = WebApp.Start(Address);
tickerHubPublisher.Start();
}
catch (Exception exception)
{
Log.Error("An error occurred while starting SignalR", exception);
}
}
What should happen is that if you had any connected clients they should (after a short period) see the server as available again and no longer display the "DISCONNECTED" tiles
The client is a standard WPF app (I am using WPF just because I know it, it is not really that important, you could use Winforms using the MVP pattern just as well). The client also makes use of a SignalR proxy class, and as such a good starting place would be to examine the SignalR .NET client APIs, which you can read more about here:
http://www.asp.net/signalr/overview/guide-to-the-api/hubs-api-guide-net-client
There is some IOC used, AutoFac is the container used. As IOC is not the main point of the article, I will not waste any more time on it. If you want to know more about the IOC/AutoFac read the code, looking into the BootStrapper
class
At the core of the client is a class called ServiceClientBase.
This class is the base class for any client side Hub proxy connection. So what exactly does this class give you?
Well lets have a look at its code for a start shall we:
using System;
using System.Reactive;
using System.Reactive.Linq;
using Client.Hub.Transport;
using Common.Extensions;
namespace Client.Hub
{
internal class ServiceClientBase
{
private readonly IConnectionProvider _connectionProvider;
protected ServiceClientBase(IConnectionProvider connectionProvider)
{
_connectionProvider = connectionProvider;
}
protected IObservable<T> GetResilientStream<T>(Func<IConnection, IObservable<T>> streamFactory,
TimeSpan connectionTimeout)
{
var activeConnections = (from connection in _connectionProvider.GetActiveConnection()
from status in connection.StatusStream
where status.ConnectionStatus == ConnectionStatus.Connected ||
status.ConnectionStatus == ConnectionStatus.Reconnected
select connection)
.Publish()
.RefCount();
var firstConnection = activeConnections.Take(1).Timeout(connectionTimeout);
var firstDisconnection = from connection in firstConnection
from status in connection.StatusStream
where status.ConnectionStatus == ConnectionStatus.Reconnecting ||
status.ConnectionStatus == ConnectionStatus.Closed
select Unit.Default;
var subsequentConnection = activeConnections.Skip(1).Select(_ => Unit.Default).Take(1);
var disconnected = firstDisconnection.Merge(subsequentConnection)
.Select(_ => Notification.CreateOnError<T>(new Exception("Connection was closed.")))
.Dematerialize();
return (from connection in firstConnection
from t in streamFactory(connection)
select t)
.Merge(disconnected)
.Publish()
.RefCount();
}
}
}
There is an awful lot going on in this class. As I say THIS class is a core class. So what exactly does it do?
Well it does a few things
- First and foremost it provides an
IObservable<T>
based on the factory which produces the final IObservable<T>
that will be yielded. This factory will be the unique source stream (IObservable<T>
) that needs to be made resilient.
- It uses a connection status stream (which internally makes use of the SignalR events (such as connected/disconnected) to create the status stream) to merge into the overall stream. When a "
Reconnection
" or "Closed
" status stream result (OnNext
value) is seen, we consider that to be a disconnection, and we synthesize an OnError
notifcation (by using the standard Rx Dematerialize
operator) that is merged with the actual facory streams value, to give an overall stream result, which could potentially now yield OnNext
/OnError
to its consumers
- We also use the standard RX
Publish
operator, we do this to share the same underlying IObservable<T>
sequence
- We also use the standard RX
RefCount
operator, such that when there is no subscribers the sequence will be disposed
This class makes use of a couple of extra uiltity classes, which we shall look at next
The ConnectionProvider
class has but one job to do, which is to return a IObservable<IConnection>
, where an IConnection
looks like this.:
internal interface IConnection
{
IObservable<ConnectionInfo> StatusStream { get; }
IObservable<Unit> Initialize();
string Address { get; }
void SetAuthToken(string authToken);
IHubProxy TickerHubProxy { get; }
}
The more eagle eyed amongst you will spot the use of Unit
, that is a kind of way of saying we do not care what the result is just that there is one.
Here the relevant code for the ConnectionProvider
class:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;
using Common.Extensions;
namespace Client.Hub.Transport
{
internal class ConnectionProvider : IConnectionProvider, IDisposable
{
private readonly SingleAssignmentDisposable disposable = new SingleAssignmentDisposable();
private readonly string username;
private readonly IObservable<IConnection> connectionSequence;
private readonly string server;
private int _currentIndex;
private static readonly ILog log = LogManager.GetLogger(typeof(ConnectionProvider));
public ConnectionProvider(string username, string server)
{
this.username = username;
this.server = server;
connectionSequence = CreateConnectionSequence();
}
public IObservable<IConnection> GetActiveConnection()
{
return connectionSequence;
}
public void Dispose()
{
disposable.Dispose();
}
private IObservable<IConnection> CreateConnectionSequence()
{
return Observable.Create<IConnection>(o =>
{
log.Info("Creating new connection...");
var connection = GetNextConnection();
var statusSubscription = connection.StatusStream.Subscribe(
_ => { },
ex => o.OnCompleted(),
() =>
{
log.Info("Status subscription completed");
o.OnCompleted();
});
var connectionSubscription =
connection.Initialize().Subscribe(
_ => o.OnNext(connection),
ex => o.OnCompleted(),
o.OnCompleted);
return new CompositeDisposable { statusSubscription, connectionSubscription };
})
.Repeat()
.Replay(1)
.LazilyConnect(disposable);
}
private IConnection GetNextConnection()
{
return new Client.Hub.Transport.Connection(server, username);
}
}
}
The Connection itself exposes a SignalR proxy and also a IObservable<ConnectInfo>
which is creating by examining all the available SignalR .NET client events. Consumers of this class will now have acess to a stream for the connections, such that they may be able to find out when a connection is lost or reconnected.
Here is this class:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Common;
using Common.Extensions;
using log4net;
using Microsoft.AspNet.SignalR.Client;
namespace Client.Hub.Transport
{
internal class Connection : IConnection
{
private readonly ISubject<ConnectionInfo> _statusStream;
private readonly HubConnection hubConnection;
private bool _initialized;
private static readonly ILog log = LogManager.GetLogger(typeof(Connection));
public Connection(string address, string username)
{
_statusStream = new BehaviorSubject<ConnectionInfo>(
new ConnectionInfo(ConnectionStatus.Uninitialized, address));
Address = address;
hubConnection = new HubConnection(address);
CreateStatus().Subscribe(
s => _statusStream.OnNext(new ConnectionInfo(s, address)),
_statusStream.OnError,
_statusStream.OnCompleted);
hubConnection.Error += exception => log.Error("There was a connection error with "
+ address, exception);
TickerHubProxy = hubConnection.CreateHubProxy(ServiceConstants.Server.TickerHub);
}
public IObservable<Unit> Initialize()
{
if (_initialized)
{
throw new InvalidOperationException("Connection has already been initialized");
}
_initialized = true;
return Observable.Create<Unit>(async observer =>
{
_statusStream.OnNext(new ConnectionInfo(ConnectionStatus.Connecting, Address));
try
{
log.InfoFormat("Connecting to {0}", Address);
await hubConnection.Start();
_statusStream.OnNext(new ConnectionInfo(ConnectionStatus.Connected, Address));
observer.OnNext(Unit.Default);
}
catch (Exception e)
{
log.Error("An error occurred when starting SignalR connection", e);
observer.OnError(e);
}
return Disposable.Create(() =>
{
try
{
log.Info("Stoping connection...");
hubConnection.Stop();
log.Info("Connection stopped");
}
catch (Exception e)
{
log.Error("An error occurred while stoping connection", e);
}
});
})
.Publish()
.RefCount();
}
private IObservable<ConnectionStatus> CreateStatus()
{
var closed = Observable.FromEvent(h => hubConnection.Closed += h,
h => hubConnection.Closed -= h).Select(_ => ConnectionStatus.Closed);
var connectionSlow = Observable.FromEvent(h => hubConnection.ConnectionSlow += h,
h => hubConnection.ConnectionSlow -= h).Select(_ => ConnectionStatus.ConnectionSlow);
var reconnected = Observable.FromEvent(h => hubConnection.Reconnected += h,
h => hubConnection.Reconnected -= h).Select(_ => ConnectionStatus.Reconnected);
var reconnecting = Observable.FromEvent(h => hubConnection.Reconnecting += h,
h => hubConnection.Reconnecting -= h).Select(_ => ConnectionStatus.Reconnecting);
return Observable.Merge(closed, connectionSlow, reconnected, reconnecting)
.TakeUntilInclusive(status => status == ConnectionStatus.Closed);
}
public IObservable<ConnectionInfo> StatusStream
{
get { return _statusStream; }
}
public string Address { get; private set; }
public IHubProxy TickerHubProxy { get; private set; }
public void SetAuthToken(string authToken)
{
}
public override string ToString()
{
return string.Format("Address: {0}", Address);
}
}
}
As we saw in the publisher, we know that a client to a particular server side SignalR hub will call 2 things, namely SubscribeTickers
and UnsubscribeTickers
. And we also know there is a streaming API of TickerDto
(At least there is for this demo app, which only contains a single hub). Where the following is true:
- The client will call the server side hub
SubscribeTickers
on connection
- The client will call the server side hub
UnsubscribeTickers
on disposal (this is done by only calling this method on disposal, see the following code where we use the power of Observable.Create(..)
along with some CompositeDisposable
to dispose of a few things, one of which is an Action
which we create using Disposable.Create(..)
where that Action
will call the UnsubscribeTickers
on disposal.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Hub.Transport;
using Common;
using log4net;
using Microsoft.AspNet.SignalR.Client;
namespace Client.Hub
{
internal class TickerHubClient : ServiceClientBase, ITickerHubClient
{
private static readonly ILog log = LogManager.GetLogger(typeof(TickerHubClient));
public TickerHubClient(IConnectionProvider connectionProvider)
: base(connectionProvider)
{
}
public IObservable<IEnumerable<TickerDto>> GetTickerStream()
{
return GetResilientStream(connection => GetTradesForConnection(connection.TickerHubProxy),
TimeSpan.FromSeconds(5));
}
private IObservable<IEnumerable<TickerDto>> GetTradesForConnection(IHubProxy tickerHubProxy)
{
return Observable.Create<IEnumerable<TickerDto>>(observer =>
{
var spotTradeSubscription = tickerHubProxy.On<IEnumerable<TickerDto>>(
ServiceConstants.Client.SendTickers, observer.OnNext);
var spotTradeSubscriptionRaceDisposable =
tickerHubProxy.On<IEnumerable<TickerDto>>(
ServiceConstants.Client.SendTickers,
(x) =>
{
Console.WriteLine("Got a new trade" + x.First().Name);
});
log.Info("Sending ticker subscription...");
var sendSubscriptionDisposable = SendSubscription(tickerHubProxy)
.Subscribe(
_ => log.InfoFormat("Subscribed to ticker."),
observer.OnError);
var unsubscriptionDisposable = Disposable.Create(() =>
{
log.Info("Sending ticker unsubscription...");
SendUnsubscription(tickerHubProxy)
.Subscribe(
_ => log.InfoFormat("Unsubscribed from ticker."),
ex => log.WarnFormat(
"An error occurred while unsubscribing from ticker: {0}",
ex.Message));
});
return new CompositeDisposable {
spotTradeSubscription, unsubscriptionDisposable,
sendSubscriptionDisposable,spotTradeSubscriptionRaceDisposable
};
})
.Publish()
.RefCount();
}
private static IObservable<Unit> SendSubscription(IHubProxy tickerHubProxy)
{
return Observable.FromAsync(() => tickerHubProxy.Invoke(
ServiceConstants.Server.SubscribeTickers));
}
private static IObservable<Unit> SendUnsubscription(IHubProxy tickerrHubProxy)
{
return Observable.FromAsync(() => tickerrHubProxy.Invoke(
ServiceConstants.Server.UnsubscribeTickers));
}
}
}
The other thing to note about this code, is that it inherits from ServiceClientBase
, and as such has the resilient connection functionality that we discussed above. Where we feed into the ServiceClientBase
class the GetTradesForConnection(IHubProxy tickerHubProxy)
method to act as the IObservable<T>
factory that is used in the final (now resilient thanks to the logic in the ServiceClientBase
base class) GetTickerStream
stream.
From here the GetTickerStream
stream is free for anyone to use. So lets see who uses it, that is discussed just below.
The TickerRepository is the next rung up the Observable chain. So what does that look like. Well it is actually suprisingly simple, but don't be fooled by that, there is a LOT going on here.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using Client.Factory;
using Client.Hub;
namespace Client.Repositories
{
class TickerRepository : ITickerRepository
{
private readonly ITickerHubClient tickerHubClient;
private readonly ITickerFactory tickerFactory;
public TickerRepository(ITickerHubClient tickerHubClient, ITickerFactory tickerFactory)
{
this.tickerHubClient = tickerHubClient;
this.tickerFactory = tickerFactory;
}
public IObservable<IEnumerable<Ticker>> GetTickerStream()
{
return Observable.Defer(() => tickerHubClient.GetTickerStream())
.Select(trades => trades.Select(tickerFactory.Create))
.Catch(Observable.Return(new Ticker[0]))
.Repeat()
.Publish()
.RefCount();
}
}
}
So what is going on here exactly?
- We use
Observable.Defer
such that we do not actually make use of the underlying stream, until someone subscribes to the IObservable
created by using Observable.Defer
. It is a way of making a hot stream cold.
- We use select to transform the strem data from
TickerDto
to Ticker
- We use
Catch
to catch any Exception
s (OnError
) in the stream. Where we use a default value for that case
- We use
Repeat
. Now this one is VERY VERY important. This is the one that allows us to repeat the whole stream, including connecting to the server side hub again. It is this mechanism that allows the client to recover from a server side loss of SignalR hub. This along with the resilient stream logic are the MOST important bits to the app (at least in my opinion)
- We use
Publish
to share the underlying stream
- We use
RefCount
to get automatic disposal when there are no longer an subscribers
So now that we have seen this repository, we only have one more hop to continue the IObservable
journey for the TickerStream
. Let see that next.
The TickersViewModel
represents ALL the Ticker(s) you see on the screen. It is this viewmodel that makes use of the lazy / repeatable / resilient IObservable
that the TickerRepository
provides. Lets see the code it is pretty self explanatory, I think:
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Client.Factory;
using Client.Repositories;
using Client.Services;
using Common;
using Common.ViewModels;
using log4net;
namespace Client.ViewModels
{
public class TickersViewModel : INPCBase
{
private readonly ITickerRepository tickerRepository;
private readonly IConcurrencyService concurrencyService;
private bool stale = false;
private static readonly ILog log = LogManager.GetLogger(typeof(TickersViewModel));
public TickersViewModel(IReactiveTrader reactiveTrader,
IConcurrencyService concurrencyService,
TickerViewModelFactory tickerViewModelFactory)
{
Tickers = new ObservableCollection<TickerViewModel>();
Tickers.Add(tickerViewModelFactory.Create("Yahoo"));
Tickers.Add(tickerViewModelFactory.Create("Google"));
Tickers.Add(tickerViewModelFactory.Create("Apple"));
Tickers.Add(tickerViewModelFactory.Create("Facebook"));
Tickers.Add(tickerViewModelFactory.Create("Microsoft"));
Tickers.Add(tickerViewModelFactory.Create("Twitter"));
this.tickerRepository = reactiveTrader.TickerRepository;
this.concurrencyService = concurrencyService;
LoadTrades();
}
public ObservableCollection<TickerViewModel> Tickers { get; private set; }
private void LoadTrades()
{
tickerRepository.GetTickerStream()
.ObserveOn(concurrencyService.Dispatcher)
.SubscribeOn(concurrencyService.TaskPool)
.Subscribe(
AddTickers,
ex => log.Error("An error occurred within the trade stream", ex));
}
private void AddTickers(IEnumerable<Ticker> incomingTickers)
{
var allTickers = incomingTickers as IList<Ticker> ?? incomingTickers.ToList();
if (!allTickers.Any())
{
stale = true;
}
else
{
if (stale)
{
stale = false;
}
}
foreach (var ticker in allTickers)
{
Tickers.Single(x => x.Name == ticker.Name)
.AcceptNewPrice(ticker.Price);
}
}
}
}}
Where each Ticker
is represented by a single TickerViewModel
which is as shown below. It can be seen that this class also makes use of the ConnectionStatusStream IObservable
we discussed earlier. This is used to change the TickerViewModel
s view to show a red box with "DISCONNECTED" in it. We will talk through that in just a minute.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Hub.Transport;
using Client.Services;
using log4net;
namespace Client.ViewModels
{
public class TickerViewModel : INPCBase
{
private decimal price;
private bool isUp;
private bool stale;
private bool disconnected;
private static readonly ILog log = LogManager.GetLogger(typeof(TickerViewModel));
public TickerViewModel(
IReactiveTrader reactiveTrader,
IConcurrencyService concurrencyService,
string name)
{
this.Name = name;
reactiveTrader.ConnectionStatusStream
.ObserveOn(concurrencyService.Dispatcher)
.SubscribeOn(concurrencyService.TaskPool)
.Subscribe(
OnStatusChange,
ex => log.Error("An error occurred within the connection status stream.", ex));
}
public string Name { get; private set; }
public void AcceptNewPrice(decimal newPrice)
{
IsUp = newPrice > price;
Price = newPrice;
}
public decimal Price
{
get { return this.price; }
private set
{
this.price = value;
base.OnPropertyChanged("Price");
}
}
public bool IsUp
{
get { return this.isUp; }
private set
{
this.isUp = value;
base.OnPropertyChanged("IsUp");
}
}
public bool Stale
{
get { return this.stale; }
set
{
this.stale = value;
base.OnPropertyChanged("Stale");
}
}
public bool Disconnected
{
get { return this.disconnected; }
set
{
this.disconnected = value;
base.OnPropertyChanged("Disconnected");
}
}
private void OnStatusChange(ConnectionInfo connectionInfo)
{
switch (connectionInfo.ConnectionStatus)
{
case ConnectionStatus.Uninitialized:
case ConnectionStatus.Connecting:
Disconnected = true;
break;
case ConnectionStatus.Reconnected:
case ConnectionStatus.Connected:
Disconnected = false;
break;
case ConnectionStatus.ConnectionSlow:
Disconnected = false;
break;
case ConnectionStatus.Reconnecting:
Disconnected = true;
break;
case ConnectionStatus.Closed:
Disconnected = true;
break;
default:
throw new ArgumentOutOfRangeException();
}
}
}
}
The last thing I wanted to show you was how the ConnectionStatusStream
was used. This stream OnNext
s when the client side SignalR hub proxy events are raised. So we see thing like Connecting, Connected,ConnectionSlow, Reconnecting,Reconnected,Closed,Uninitialized
. All of which started out life as events on the SignalR hub proxy, and are turned in to an IObservable
stream using one of the many RX factories. In this case IObservable.FromEvent
.
Anyway here is the overall ConnectivityStatusViewModel
that we use to show the information in the bottom status bar of the app.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Hub.Transport;
using Client.Services;
using log4net;
namespace Client.ViewModels
{
public class ConnectivityStatusViewModel : INPCBase
{
private static readonly ILog log = LogManager.GetLogger(typeof(ConnectivityStatusViewModel));
private string server;
private string status;
private bool disconnected;
public ConnectivityStatusViewModel(
IReactiveTrader reactiveTrader,
IConcurrencyService concurrencyService)
{
reactiveTrader.ConnectionStatusStream
.ObserveOn(concurrencyService.Dispatcher)
.SubscribeOn(concurrencyService.TaskPool)
.Subscribe(
OnStatusChange,
ex => log.Error("An error occurred within the connection status stream.", ex));
}
private void OnStatusChange(ConnectionInfo connectionInfo)
{
Server = connectionInfo.Server;
switch (connectionInfo.ConnectionStatus)
{
case ConnectionStatus.Uninitialized:
case ConnectionStatus.Connecting:
Status = "Connecting...";
Disconnected = true;
break;
case ConnectionStatus.Reconnected:
case ConnectionStatus.Connected:
Status = "Connected";
Disconnected = false;
break;
case ConnectionStatus.ConnectionSlow:
Status = "Slow connection detected";
Disconnected = false;
break;
case ConnectionStatus.Reconnecting:
Status = "Reconnecting...";
Disconnected = true;
break;
case ConnectionStatus.Closed:
Status = "Disconnected";
Disconnected = true;
break;
default:
throw new ArgumentOutOfRangeException();
}
}
public string Server
{
get { return this.server; }
set
{
this.server = value;
base.OnPropertyChanged("Server");
}
}
public string Status
{
get { return this.status; }
set
{
this.status = value;
base.OnPropertyChanged("Status");
}
}
public bool Disconnected
{
get { return this.disconnected; }
set
{
this.disconnected = value;
base.OnPropertyChanged("Disconnected");
}
}
}
}
Well there we go, I hope I have shown you all that RX is actually way more than just LINQ to events, it contains a great many classes for dealing with concurrency, LINQ, events, streams, factories, retries, error handling, disposing of resources.
Some of you may be pleased to know this is a 2 part article, in the next part I will be teaming up with the author of the .NET port of the highly regarded socket library ZeroMQ. The .NET port is called NetMQ, and together with its author (Ok I did contribute a bit to that library too) we will be showing you how you can build this sort of app using NetMQ. So until then, if you have enjoyed this article, please feel free to leave a comment/vote they are most welcome