You can grab the code from my github repository right here
https://github.com/sachabarber/NetMQRxDemo
This is the 2nd part of this series of articles. If you missed part one this is main points from the introduction section from the last article.
I work in finance, and at the moment work in the Foreign Exchange (FX) space, where we have lots of requirements that involve streaming of various things, such as:
- Rates
- Trades
- Other financial instruments
Historically what we would have done to accomodate these requirements was to use sockets and push our own data (usually JSON or XML) down the wire, and then had some quite knarly listeners above that, where each listener would want some slightly different view of the original streaming data.
Thing is, time has never stands still in the world of software development, and new things come out all the time, literally every other day a new framework comes along that helps improve what came before it in some way or another.
One particular thing, that I think a lot of people actually misunderstood was Reactive Extensions RX, which in fairness has been around a while now. I think a lot of people think of RX as LINQ to events, which it does have. However the truth is that RX is a superb framework for creating streaming applications and the more you get into RX the more you tend to see everything as a stream, from and event, down to picking a file or a ICommand.Execute
in a ViewModel say.
RX comes with many many tools in it's arsenal, such as
- 2 core interfaces
IObservable
/ IObserver
- Lots of LINQ like extension methods for
IObservable
- Time / window based operations (LINQ doesn't really have these)
- Concurrency schedulers
- Stream error handling controls (Catch etc etc)
- Ability to create
IObservable
from a great many other things, such as
IEnumerable
Task
- Events
IObservable
Factories
I personally feel that a lot of people could make use of RX to great gain, if they were to put in the time to learn about it, a bit more.
Last time we talked about SignalR working with RX. Thing is, sometimes people will want to use sockets for one reason or another. Now I like sockets, what I am not a fan off is how much boiler plate code one has to do when using standard .NET socket types. Luckily you don't have to, as there is this totally awesome project called ZeroMQ, which has a .NET port by way of NetMQ, which I am actually pretty pleased to say I had a very small (minute) hand in. I wrote the Actor part of NetMQ. When I first started disecting the SignalR demo (which if you have read the 1st article you will know was based on the great work by Lee Campbell (Intro To RX author and all round top chap, and his collegues at Adaptive)), I contacted the author of NetMQ, and asked if he would be willing to give me a hand with a NetMQ port of that code, and he kindly agreed, you should see him here as my co-author, thanks Doron, you rock.
So in this article Doron and I outline what you would need to do to create a similiar app using NetMQ and RX.
- Streaming API using SignalR and RX
- Streaming API using NetMQ and RX (this article)
I think the best way to see what this demo is about is to have a look at the video, which I have uploaded to YouTube here:
https://www.youtube.com/watch?v=uTqCA1cN16k
Now I covered this in the 1st article, but just in case some of you missed that I think it is important ground to cover again, so here we go:
There are many many RX Extension methods which you can make use of:
http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable_methods(v=vs.103).aspx
But perhaps the best place to start with RX, is to understand the underlying 2 interfaces you will use time and time again. These are IObservable
and IObserver
. We discuss these 2 next.
namespace System
{
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
}
Defines a provider for push-based notification.
This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.
Subscribe
Notifies the provider that an observer is to receive notifications.
Further reading : http://msdn.microsoft.com/en-us/library/dd990377%28v=vs.110%29.aspx
namespace System
{
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
}
The IObserver<T>
and IObservable<T>
interfaces provide a generalized mechanism for push-based notification, also known as the observer design pattern. The IObservable<T>
interface represents the class that sends notifications (the provider); the IObserver<T>
interface represents the class that receives them (the observer). T
represents the class that provides the notification information.
An IObserver<T>
implementation arranges to receive notifications from a provider (an IObservable<T>
implementation) by passing an instance of itself to the provider's IObservable<T>.Subscribe
method. This method returns an IDisposable
object that can be used to unsubscribe the observer before the provider finishes sending notifications.
The IObserver<T>
interface defines the following three methods that the observer must implement:
- The
OnNext
method, which is typically called by the provider to supply the observer with new data or state information.
- The
OnError
method, which is typically called by the provider to indicate that data is unavailable, inaccessible, or corrupted, or that the provider has experienced some other error condition.
- The
OnCompleted
method, which is typically called by the provider to indicate that it has finished sending notifications to observers.
Further reading : http://msdn.microsoft.com/en-us/library/dd783449%28v=vs.110%29.aspx
Anyone that has used RX a bit, will come to the point that they may like to create their own operators. In LINQ this would be done by creating a new method (perhaps an extension method) that simple returns a new IEnumerable<T>
. In RX things are slightly trickier but not much trickier. In RX you would make use of Observable.Create
to create your own operators, or you could also use Observable.Create
as a factory for creating new a Observable<T>
. Here is a simple example:
private IObservable<string> CreateObservable()
{
return Observable.Create<string>(
(IObserver<string> observer) =>
{
observer.OnNext("a");
observer.OnNext("b");
observer.OnCompleted();
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
});
}
Observable.Create
creates an observable sequence from a subscribe method implementation. You would typically use an extension method to an existing IObservable<T>
and then put an Observable.Create(..)
as the body of the extension method.
Observable.Create
has this method signature
public static IObservable<TSource> Create<TSource>(
Func<IObserver<TSource>, IDisposable> subscribe
)
The interesting part of this method signature is that you can see that it returns a Func
delegate hat takes a IObserver<TSource>
which allows you to push values into the observer using its OnNext/OnError
and OnCompleted
methods. The Func
delegate returns a IDisposable
, which will called when disposal happens. Now RX comes with a great many different IDisposable
types, don't be suprised if you see some quite exotic IDisposable
types being used inside of an Observable.Create
implementation.
One of the best descriptions of using Observable.Create
is from Lee Campbells site :
http://www.introtorx.com/content/v1.0.10621.0/04_CreatingObservableSequences.html
This diagram illustrates the general idea:
The publisher is made up of a few bits and pieces, which are described below, one thing to note is that both the client and the server make use of NetMQ, so it may be an idea to read a little bit about NetMQ before you start. Somdoron (NetMQ author) has a few posts on NetMQ on his blog, and there are a few more posts here and there, oh I myself have done a few, which are shown below. During the course of writing this article, Somdoron asked me if I wanted to get more involved with NetMQ, and I do, so after XMAS, I have agreed to help out a lot more with NetMQ, the first step of that will be to properly document it, which is one area where it does (to be fair) need a bit more of a push.
There is some IOC used, AutoFac is the container used. As IOC is not the main point of the article, I will not waste any more time on it. If you want to know more about the IOC/AutoFac read the code, looking into the BootStrapper
class
There is not much to say about this window, except that is has the following controls
- Start NetMQ : Which starts the NetMQ
NetMQPublisher Actor PublisherSocket
(useful for simulating crash/restart of NetMQ server)
- Stop NetMQ : Which disposes the NetMQ
NetMQPublisher Actor
(useful for simulating crash/restart of NetMQ server)
- Start Auto Ticker Data : Will push out random
TickerDto
objects out of the NetMQ NetMQPublisher Actor
at fixed intervals
- Stop Auto Ticker Data : Will suspend the sending of
TickerDto
objects out of the NetMQ NetMQPublisher Actor
- Send One Ticker : Will push 1 SINGLE random
TickerDto
object out of the NetMQ NetMQPublisher Actor
Don't be too worried if this term "Actor" doesn't mean anything to you yet, we will get on to that soon.
The MainWindowViewModel
is really used to facilitate sending commands to either stop/create the NetMQ NetMQPublisher Actor
, or to tell the NetMQ NetMQPublisher Actor
to do things. Here is the full code for the MainWindowViewModel
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using System.Windows.Input;
using Common.ViewModels;
using log4net;
using NetMQServer.Ticker;
namespace NetMQServer
{
public class MainWindowViewModel
{
private readonly ITickerPublisher tickerPublisher;
private readonly ITickerRepository tickerRepository;
private Random rand;
private static readonly ILog Log = LogManager.GetLogger(typeof(MainWindowViewModel));
private CancellationTokenSource autoRunningCancellationToken;
private Task autoRunningTask;
private bool serverStarted;
private bool autoTickerStarted;
public MainWindowViewModel(ITickerPublisher tickerPublisher, ITickerRepository tickerRepository)
{
this.tickerPublisher = tickerPublisher;
this.tickerRepository = tickerRepository;
this.rand = new Random();
serverStarted = false;
autoTickerStarted = false;
AutoTickerStartCommand = new DelegateCommand(
AutoRunning,
() => serverStarted && !autoTickerStarted);
AutoTickerStopCommand = new DelegateCommand(
() =>
{
if (autoRunningCancellationToken != null)
{
autoRunningCancellationToken.Cancel();
autoRunningTask.Wait();
autoTickerStarted = false;
}
},
() => serverStarted && autoTickerStarted);
SendOneTickerCommand = new DelegateCommand(
SendOneManualFakeTicker,
() => serverStarted && !autoTickerStarted);
StartCommand = new DelegateCommand(
StartServer,
() => !serverStarted);
StopCommand = new DelegateCommand(
StopServer,
() => serverStarted);
}
public DelegateCommand AutoTickerStartCommand { get; set; }
public DelegateCommand AutoTickerStopCommand { get; set; }
public DelegateCommand SendOneTickerCommand { get; set; }
public DelegateCommand StartCommand { get; set; }
public DelegateCommand StopCommand { get; set; }
public void Start()
{
StartServer();
}
private void AutoRunning()
{
autoTickerStarted = true;
autoRunningCancellationToken = new CancellationTokenSource();
autoRunningTask = Task.Run(async () =>
{
while (!autoRunningCancellationToken.IsCancellationRequested)
{
SendOneManualFakeTicker();
await Task.Delay(20);
}
});
}
private void SendOneManualFakeTicker()
{
var currentTicker = tickerRepository.GetNextTicker();
var flipPoint = rand.Next(0, 100);
if (flipPoint > 50)
{
currentTicker.Price += currentTicker.Price / 30;
}
else
{
currentTicker.Price -= currentTicker.Price / 30;
}
tickerRepository.StoreTicker(currentTicker);
tickerPublisher.PublishTrade(currentTicker);
}
private void StartServer()
{
serverStarted = true;
tickerPublisher.Start();
AutoRunning();
}
private void StopServer()
{
if (autoTickerStarted)
{
autoRunningCancellationToken.Cancel();
autoRunningTask.Wait();
autoTickerStarted = false;
autoRunningCancellationToken = null;
autoRunningTask = null;
}
tickerPublisher.Stop();
serverStarted = false;
}
}
}
As you can see not much really goes on in the MainWindowViewModel
, all the action happens elsewhere.
Before we get into the nitty gritty of how the NetMQPublisher
works, it is worth noting one thing. Which is that the server and the client use an Actor
model for NetMQ, which I am actually very pleased to see being used, as it was me that wrote that part, and it was that peice that introduced me to Somdoron.
You can essentially think of the Actor
as a socket that you can send messages (commands) to. Internally the Actor
uses a special PairSocket
, where you are writing to one half of the pair when you send messages (commands) to the Actor
, and internally the Actor
is receiving from on the other end of the PairSocket
, where it will carry out some work in response to the commands it receieves of the PairSocket
.
What you really do is work out a simple protocol that you will use between the source of the command and the Actor
and stick to that protocol. This protocol may include commands to tell the Actor
to do things, or it may be a command to signal the Actor
to stop doing anything (NetMQ has a special command ActorKnownMessages.END_PIPE
, for this case since it is such a common requirement)
Now you may be wondering why we chose to use this Actor
model at all? Well it is quite simple, using the Actor
model ensures there is never any contention for locks as there simply is no shared data at all, as the data is sent over a socket, so the Actor
is guarenteed to have its own copy of the data, and as such no locking is needed, and as such there is no waiting for locks, which helps keeps things nice and fast and thread safe.
You can read a quite in depth blog post I did on this here : #7 : Simple Actor Model
As we saw above the MainWindowViewModel
contains commands to Start/Stop the NetMQPublisher
. But what do they do, how does that start/stop the NetMQPublisher
?
In the case of the Start what happens if actually quite simple, internally the NetMQPublisher
will create a new Actor
that is then ready to receive commands.
public void Start()
{
actor = new Actor<object>(context, new ShimHandler(context, tickerRepository), null);
}
That is all done by the NetMQPublisher
which we just discussed a bit above. Here is the code for the NetMQPublisher:
The NetMQPublisher:
actually does a few things:
- Sends out a one time snapshot in response (
ResponseSocket
) to the a client initiated request (RequestSocket
)
- Publishes (
PublisherSocket
) "Trades" out to all connected clients that have subscribed (SubscriberSocket
) for the "Trades"
- Publishes (
PublisherSocket
) heartbeat "HB" out to all connected clients that have subscribed (SubscriberSocket
) for the "HB"
using System;
using System.Collections.Generic;using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Navigation;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;
namespace NetMQServer.Ticker
{
public class NetMQPublisher : ITickerPublisher
{
private const string PublishTicker = "P";
public class ShimHandler : IShimHandler<object>
{
private readonly NetMQContext context;
private PublisherSocket publisherSocket;
private ResponseSocket snapshotSocket;
private ITickerRepository tickerRepository;
private Poller poller;
private NetMQTimer heartbeatTimer;
public ShimHandler(NetMQContext context, ITickerRepository tickerRepository)
{
this.context = context;
this.tickerRepository = tickerRepository;
}
public void Initialise(object state)
{
}
public void RunPipeline(PairSocket shim)
{
publisherSocket = context.CreatePublisherSocket();
publisherSocket.Bind("tcp://*:" + StreamingProtocol.Port);
snapshotSocket = context.CreateResponseSocket();
snapshotSocket.Bind("tcp://*:" + SnapshotProtocol.Port);
snapshotSocket.ReceiveReady += OnSnapshotReady;
shim.ReceiveReady += OnShimReady;
heartbeatTimer = new NetMQTimer(StreamingProtocol.HeartbeatInterval);
heartbeatTimer.Elapsed += OnHeartbeatTimerElapsed;
shim.SignalOK();
poller = new Poller();
poller.AddSocket(shim);
poller.AddSocket(snapshotSocket);
poller.AddTimer(heartbeatTimer);
poller.Start();
publisherSocket.Dispose();
snapshotSocket.Dispose();
}
private void OnHeartbeatTimerElapsed(object sender, NetMQTimerEventArgs e)
{
publisherSocket.Send(StreamingProtocol.HeartbeatTopic);
}
private void OnSnapshotReady(object sender, NetMQSocketEventArgs e)
{
string command = snapshotSocket.ReceiveString();
if (command == SnapshotProtocol.GetTradessCommand)
{
var tickers = tickerRepository.GetAllTickers();
foreach (var ticker in tickers)
{
snapshotSocket.SendMore(JsonConvert.SerializeObject(ticker));
}
snapshotSocket.Send(SnapshotProtocol.EndOfTickers);
}
}
private void OnShimReady(object sender, NetMQSocketEventArgs e)
{
string command = e.Socket.ReceiveString();
switch (command)
{
case ActorKnownMessages.END_PIPE:
poller.Stop(false);
break;
case PublishTicker:
string topic = e.Socket.ReceiveString();
string json = e.Socket.ReceiveString();
publisherSocket.
SendMore(topic).
Send(json);
break;
}
}
}
private Actor<object> actor;
private readonly NetMQContext context;
private readonly ITickerRepository tickerRepository;
public NetMQPublisher(NetMQContext context, ITickerRepository tickerRepository)
{
this.context = context;
this.tickerRepository = tickerRepository;
}
public void Start()
{
if (actor != null)
return;
actor = new Actor<object>(context, new ShimHandler(context, tickerRepository), null);
}
public void Stop()
{
if (actor != null)
{
actor.Dispose();
actor = null;
}
}
public void PublishTrade(TickerDto ticker)
{
if (actor == null)
return;
actor.
SendMore(PublishTicker).
SendMore(StreamingProtocol.TradesTopic).
Send(JsonConvert.SerializeObject(ticker));
}
}
}
}
What happens internally within the Actor
pipeline of the NetMQPublisher
is that a couple of extra sockets are created.
These are NOT the sockets the Actor
uses internally, those are a dedicated PairSocket
pair that you will not really see, they are part of the NetMQ codebase. The sockets we are talking about here are the ones that are used for the application logic, which is this demo application example are as follows:
- A
PublisherSocket
: This socket is used to publish out to clients. The way that NetMQ works is by using message frame(s) such that the 1st frame may be used as a topic say, and the next message frame may be the actual payload. That way the client(s) (SubscriberSocket
(s)) are able to see if the message is one they are interested in, before they go through the work of dealing with the message payload. This one PublisherSocket
should be enough to serve many different topics, you would simply provide the following things to the publisher
- Message topic
- Message payload
An example of sending a specific message through the NetMQPublisher
is as follows:
public void PublishTrade(TickerDto ticker)
{
actor.
SendMore(PublishTicker).
SendMore(StreamingProtocol.TradesTopic).
Send(JsonConvert.SerializeObject(ticker));
}
There are actually 2 topics being used in the demo app which are as follows:
-
TradesTopic ("Trades")
: This uses the single NetMQPublisher
held PublisherSocket
to stream TickerDto
objects at the connected clients. On the client side they use a SubscriberSocket
where the topic is set to "Trades
" such that they will only receive items from the publisher that have come from the NetMQPublisher PublisherSocket
which have a topic matching "Trades
"
HeartbeatTopic ("HB")
: This also uses the single NetMQPublisher
held PublisherSocket
to stream a single message frame containing just the topic name "HB
" (the message content is not important, just the topic name, such that client can see a new "HB
" topic message) to all connected clients. On the client side a SubscriberSocket
is used where the topic is set to "HB
" such that they will only receive items from the publisher that have come from the NetMQPublisher PublisherSocket
which have a topic matching "HB
". So what happens is that the server will initiate the publishing of a message, and the client contains a subscriber for the heartbeat topic and in the clients subscriber code this arrangement works as follows:
- If the server (
NetMQPublisher
) responds in a timely manner, the communication between the client and the server is considered ok.
- If the server (
NetMQPublisher
) DOES NOT respond in a timely manner, the communication between the client and the server is considered broken/disconnected/bad/fubar/wrecked/ruined/bogus.
- A
ResponseSocket
: This socket is used for the sending of a snapshot of all trades that are stored in the in memory publisher process held TickerRespository
(which is really nothing more than a queue of the last x many trades) between the clients and the server. The client(s) would contain a RequestSocket
, whilst the server (the NetMQPublisher
) contains the ResponseSocket
. So what happens is that the client will initiate the sending of a message (the message content itself is not important), and shall expect a response which would be the current x many trades serialized as JSON from the server side TickerRespository
. This is done as a one of within the client at startup
Simulating A Crash In The Publisher
This bit is easy, just use the "Stop NetMQ" button in the MainWindow. This will simply run the following code, which disposes the Actor
in the NetMQPublisher
class
public void Stop()
{
actor.Dispose();
}
What should happen, is that if you had any connected clients they should (after a short period) see the server as unavailable and should display "DISCONNECTED" tiles
This bit is easy, just use the "Start NetMQ" button in the MainWindow
. This will simply run the following code
private void StartServer()
{
serverStarted = true;
tickerPublisher.Start();
AutoRunning();
}
What should happen is that if you had any connected clients they should (after a short period) see the server as available again and no longer display the "DISCONNECTED" tiles
A client is a standard WPF app (I am using WPF just because I know it, it is not really that important, you could use Winforms using the MVP pattern just as well). The client also makes use of NetMQ, and as such a good starting place would be to examine what NetMQ has to offer
Here is screen shot of a couple of clients running at the same time, which is something I did not show in the 1st article
And this is what they look like after we click the "Stop NetMQ" button on the server
There is some IOC used, AutoFac is the container used. As IOC is not the main point of the article, I will not waste any more time on it. If you want to know more about the IOC/AutoFac read the code, looking into the BootStrapper
class
The way we decided to structure things was to have a Rx'y type client which exposes a IObservable
stream which the rest of the app can make use of. Within that IObservable
we would use the power of Observable.Create(..)
to create a factory that would create the IObservable
stream for that scenario by calling into/creating the NetMQ classes required to forfill the stream requirements.
So the gerneral pattern in this demo would be something like
XXXXClient
will be the thing that has the IObservable
stream that the rest of the app uses, which will internally use a XXXXNetMQClient
to do this nitty gritty comms to the NetMQ server.
We took the decision that for each non "heartbeat (HB)" topic we would have a dedicated client between the client and the server. This is kind of similiar to what we had in part 1 where there was a dedicated client proxy per Hub type.
The NetMQTickerClient
is where all the client side NetMQ shennanigans is. It is within the NetMQTickerClient
that the client will use the NetMQ SubscriberSocket
to subscribe to the "Trades
" topic. As before we will make use of the Actor
framework within NetMQ. The NetMQTickerClient
also does the initial snapshot with the server using a RequestSocket
, where the NetMQ server will have a ResponseSocket
. The server will send the initial snapshot of TickerDto
s that are then made available on the ticker stream for the app to use.
The NetMQTickerClient
also does a bit of error handling OnError
'ing, but the main way to detect failure issues is by using the HeartBeatClient
.
Here is the NetMQTickerClient
code:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Client.Factory;
using Client.Comms.Transport;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;
namespace Client.Comms
{
public class NetMQTickerClient : IDisposable
{
private Actor<object> actor;
private Subject<TickerDto> subject;
private CompositeDisposable disposables = new CompositeDisposable();
class ShimHandler : IShimHandler<object>
{
private NetMQContext context;
private SubscriberSocket subscriberSocket;
private Subject<TickerDto> subject;
private string address;
private Poller poller;
private NetMQTimer timeoutTimer;
public ShimHandler(NetMQContext context, Subject<TickerDto> subject, string address)
{
this.context = context;
this.address = address;
this.subject = subject;
}
public void Initialise(object state)
{
}
public void RunPipeline(PairSocket shim)
{
shim.SignalOK();
this.poller = new Poller();
shim.ReceiveReady += OnShimReady;
poller.AddSocket(shim);
timeoutTimer = new NetMQTimer(StreamingProtocol.Timeout);
timeoutTimer.Elapsed += TimeoutElapsed;
poller.AddTimer(timeoutTimer);
Connect();
poller.Start();
if (subscriberSocket != null)
{
subscriberSocket.Dispose();
}
}
private void Connect()
{
using (RequestSocket requestSocket = context.CreateRequestSocket())
{
requestSocket.Connect(string.Format("tcp://{0}:{1}", address, SnapshotProtocol.Port));
requestSocket.Send(SnapshotProtocol.GetTradessCommand);
string json;
requestSocket.Options.ReceiveTimeout = SnapshotProtocol.RequestTimeout;
try
{
json = requestSocket.ReceiveString();
}
catch (AgainException ex)
{
Task.Run(() => subject.OnError(new Exception("No response from server")));
return;
}
while (json != SnapshotProtocol.EndOfTickers)
{
PublishTicker(json);
json = requestSocket.ReceiveString();
}
}
subscriberSocket = context.CreateSubscriberSocket();
subscriberSocket.Subscribe(StreamingProtocol.TradesTopic);
subscriberSocket.Subscribe(StreamingProtocol.HeartbeatTopic);
subscriberSocket.Connect(string.Format("tcp://{0}:{1}", address, StreamingProtocol.Port));
subscriberSocket.ReceiveReady += OnSubscriberReady;
poller.AddSocket(subscriberSocket);
timeoutTimer.Enable = false;
timeoutTimer.Enable = true;
}
private void TimeoutElapsed(object sender, NetMQTimerEventArgs e)
{
Task.Run(() => subject.OnError(new Exception("Disconnected from server")));
}
private void OnShimReady(object sender, NetMQSocketEventArgs e)
{
string command = e.Socket.ReceiveString();
if (command == ActorKnownMessages.END_PIPE)
{
poller.Stop(false);
}
}
private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
{
string topic = subscriberSocket.ReceiveString();
if (topic == StreamingProtocol.TradesTopic)
{
string json = subscriberSocket.ReceiveString();
PublishTicker(json);
timeoutTimer.Enable = false;
timeoutTimer.Enable = true;
}
else if (topic == StreamingProtocol.HeartbeatTopic)
{
timeoutTimer.Enable = false;
timeoutTimer.Enable = true;
}
}
private void PublishTicker(string json)
{
TickerDto tickerDto = JsonConvert.DeserializeObject<TickerDto>(json);
subject.OnNext(tickerDto);
}
}
public NetMQTickerClient(NetMQContext context, string address)
{
subject = new Subject<TickerDto>();
this.actor = new Actor<object>(context, new ShimHandler(context, subject, address), null);
this.disposables.Add(this.actor);
this.disposables.Add(NetMQHeartBeatClient.Instance.GetConnectionStatusStream()
.Where(x => x.ConnectionStatus == ConnectionStatus.Closed)
.Subscribe(x =>
this.subject.OnError(new InvalidOperationException("Connection to server has been lost"))));
}
public IObservable<TickerDto> GetTickerStream()
{
return subject.AsObservable();
}
public void Dispose()
{
this.disposables.Dispose();
}
}
}
The TickerClient
can be used to throughout the app to stream TickerDto
objects, where it simply wraps another stream from the NetMQTickerClient
. The important part being that when an error occurs and the Repeat
that is inside the TickerRepository
kicks in, that the TickerClient IObservable
subscription will recreate the NetMQHeartBeatClient.
Which will ensure that the NetMQHeartBeatClient
will attempt to communicate with the server again. As before it all comes down to good housekeeping and lifestyle management.
Here is the code for the TickerClient
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Common;
using NetMQ;
namespace Client.Comms
{
public class TickerClient : ITickerClient
{
private readonly NetMQContext context;
private readonly string address;
public TickerClient(NetMQContext context, string address)
{
this.context = context;
this.address = address;
}
public IObservable<TickerDto> GetTickerStream()
{
return Observable.Create<TickerDto>(observer =>
{
NetMQTickerClient client = new NetMQTickerClient(context, address);
var disposable = client.GetTickerStream().Subscribe(observer);
return new CompositeDisposable { client, disposable };
})
.Publish()
.RefCount();
}
public IObservable<ConnectionInfo> ConnectionStatusStream()
{
return Observable.Create<ConnectionInfo>(observer =>
{
NetMQHeartBeatClient.Instance.InitialiseComms();
var disposable = NetMQHeartBeatClient.Instance.
GetConnectionStatusStream().Subscribe(observer);
return new CompositeDisposable { disposable };
})
.Publish()
.RefCount();
}
}
}
We took the decision that the heartbeat between a single client and the server is a global concern in the context of that client.
As such there is only ever expected to be a single HeartBeatClient
(this is achieved through IOC registration), and there is only ever one instance (a hard coded designed) singleton of the NetMQHeartBeatClient
The NetMQHeartBeatClient
is where all the client side NetMQ shennanigans is. It is within the NetMQHeartBeatClient
that the client will use the NetMQ SubscriberSocket
to subscribe to the "HeartBeat (HB)
" topic. As before we will make use of the Actor
framework within NetMQ. This is also where we would expect a response back from the server side PublisherSocket
within x amount of time. If we do not get that response we consider the comms to be broken, and we use an internal RX Subject<T>
to OnNext
the relevant ConnectionInfo/ConnectionStatus
.
Here is the NetMQHeartBeatClient
code:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Client.Factory;
using Client.Comms.Transport;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;
namespace Client.Comms
{
public class NetMQHeartBeatClient
{
private readonly NetMQContext context;
private readonly string address;
private Actor<object> actor;
private Subject<ConnectionInfo> subject;
private static NetMQHeartBeatClient instance = null;
private static object syncLock = new object();
protected int requiresInitialisation = 1;
class ShimHandler : IShimHandler<object>
{
private NetMQContext context;
private SubscriberSocket subscriberSocket;
private Subject<ConnectionInfo> subject;
private string address;
private Poller poller;
private NetMQTimer timeoutTimer;
private NetMQHeartBeatClient parent;
public ShimHandler(NetMQContext context, Subject<ConnectionInfo> subject, string address)
{
this.context = context;
this.address = address;
this.subject = subject;
}
public void Initialise(object state)
{
parent = (NetMQHeartBeatClient) state;
}
public void RunPipeline(PairSocket shim)
{
shim.SignalOK();
this.poller = new Poller();
shim.ReceiveReady += OnShimReady;
poller.AddSocket(shim);
timeoutTimer = new NetMQTimer(StreamingProtocol.Timeout);
timeoutTimer.Elapsed += TimeoutElapsed;
poller.AddTimer(timeoutTimer);
Connect();
poller.Start();
if (subscriberSocket != null)
{
subscriberSocket.Dispose();
}
}
private void Connect()
{
subscriberSocket = context.CreateSubscriberSocket();
subscriberSocket.Subscribe(StreamingProtocol.HeartbeatTopic);
subscriberSocket.Connect(string.Format("tcp://{0}:{1}", address, StreamingProtocol.Port));
subject.OnNext(new ConnectionInfo(ConnectionStatus.Connecting, this.address));
subscriberSocket.ReceiveReady += OnSubscriberReady;
poller.AddSocket(subscriberSocket);
timeoutTimer.Enable = false;
timeoutTimer.Enable = true;
}
private void TimeoutElapsed(object sender, NetMQTimerEventArgs e)
{
Task.Run(() =>
{
parent.requiresInitialisation = 1;
subject.OnNext(new ConnectionInfo(ConnectionStatus.Closed, this.address));
});
}
private void OnShimReady(object sender, NetMQSocketEventArgs e)
{
string command = e.Socket.ReceiveString();
if (command == ActorKnownMessages.END_PIPE)
{
poller.Stop(false);
}
}
private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
{
string topic = subscriberSocket.ReceiveString();
if (topic == StreamingProtocol.HeartbeatTopic)
{
subject.OnNext(new ConnectionInfo(ConnectionStatus.Connected, this.address));
timeoutTimer.Enable = false;
timeoutTimer.Enable = true;
}
}
}
private NetMQHeartBeatClient(NetMQContext context, string address)
{
this.context = context;
this.address = address;
InitialiseComms();
}
public static NetMQHeartBeatClient CreateInstance(NetMQContext context, string address)
{
if (instance == null)
{
lock (syncLock)
{
if (instance == null)
{
instance = new NetMQHeartBeatClient(context,address);
}
}
}
return instance;
}
public void InitialiseComms()
{
if (Interlocked.CompareExchange(ref requiresInitialisation, 0, 1) == 1)
{
if (actor != null)
{
this.actor.Dispose();
}
subject = new Subject<ConnectionInfo>();
this.actor = new Actor<object>(context, new ShimHandler(context, subject, address), this);
}
}
public IObservable<ConnectionInfo> GetConnectionStatusStream()
{
return subject.AsObservable();
}
public static NetMQHeartBeatClient Instance
{
get { return instance; }
}
}
}
It is the HeartBeatClient
that is exposed throughout the app, where it simply wraps another stream from the NetMQHeartBeatClient
, which may be used to infer the connectivity status of the comms beteween the client (as a whole) and the server. The important part being that when an error occurs and the Repeat
happens, that the HeartBeatClient
is recreated. Which will ensure that the NetMQHeartBeatClient
will attempt to communicate with the server again. As before it all comes down to good housekeeping and lifestyle management.
Here is the code for the HeartBeatClient
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms;
using Client.Comms.Transport;
using NetMQ;
namespace Client.Comms
{
public class HeartBeatClient : IHeartBeatClient
{
public IObservable<ConnectionInfo> ConnectionStatusStream()
{
return Observable.Create<ConnectionInfo>(observer =>
{
NetMQHeartBeatClient.Instance.InitialiseComms();
var disposable = NetMQHeartBeatClient.Instance
.GetConnectionStatusStream().Subscribe(observer);
return new CompositeDisposable { disposable };
})
.Publish()
.RefCount();
}
}
}
The TickerRepository
is the next rung up the Observable chain. So what does that look like. Well it is actually suprisingly simple, but don't be fooled by that, there is a LOT going on here.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using Client.Factory;
using Client.Comms;
namespace Client.Repositories
{
class TickerRepository : ITickerRepository
{
private readonly ITickerClient tickerClient;
private readonly ITickerFactory tickerFactory;
public TickerRepository(ITickerClient tickerClient, ITickerFactory tickerFactory)
{
this.tickerClient = tickerClient;
this.tickerFactory = tickerFactory;
}
public IObservable<Ticker> GetTickerStream()
{
return Observable.Defer(() => tickerClient.GetTickerStream())
.Select(tickerFactory.Create)
.Catch<Ticker>(Observable.Empty<Ticker>())
.Repeat()
.Publish()
.RefCount();
}
}
}
So what is going on here exactly?
- We use
Observable.Defer
such that we do not actually make use of the underlying stream, until someone subscribes to the IObservable
created by using Observable.Defer
. It is a way of making a hot stream cold.
- We use select to transform the strem data from
TickerDto
to Ticker
- We use
Catch
to catch any Exception
s (OnError
) in the stream. Where we use a default value for that case
- We use
Repeat
. Now this one is VERY VERY important. This is the one that allows us to repeat the whole stream, including connecting to the server side hub again. It is this mechanism that allows the client to recover from a server side loss of SignalR hub. This along with the resilient stream logic are the MOST important bits to the app (at least in my opinion)
- We use
Publish
to share the underlying stream
- We use
RefCount
to get automatic disposal when there are no longer an subscribers
So now that we have seen this repository, we only have one more hop to continue the IObservable
journey for the TickerStream
. Let see that next.
The TickersViewModel
represents ALL the Ticker(s) you see on the screen. It is this viewmodel that makes use of the lazy / repeatable / resilient IObservable
that the TickerRepository
provides. Lets see the code it is pretty self explanatory, I think:
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Client.Factory;
using Client.Repositories;
using Client.Services;
using Common;
using Common.ViewModels;
using log4net;
namespace Client.ViewModels
{
public class TickersViewModel : INPCBase
{
private readonly ITickerRepository tickerRepository;
private readonly IConcurrencyService concurrencyService;
private bool stale = false;
private static readonly ILog log = LogManager.GetLogger(typeof(TickersViewModel));
public TickersViewModel(IReactiveTrader reactiveTrader,
IConcurrencyService concurrencyService,
TickerViewModelFactory tickerViewModelFactory)
{
Tickers = new ObservableCollection<TickerViewModel>();
Tickers.Add(tickerViewModelFactory.Create("Yahoo"));
Tickers.Add(tickerViewModelFactory.Create("Google"));
Tickers.Add(tickerViewModelFactory.Create("Apple"));
Tickers.Add(tickerViewModelFactory.Create("Facebook"));
Tickers.Add(tickerViewModelFactory.Create("Microsoft"));
Tickers.Add(tickerViewModelFactory.Create("Twitter"));
this.tickerRepository = reactiveTrader.TickerRepository;
this.concurrencyService = concurrencyService;
LoadTrades();
}
public ObservableCollection<TickerViewModel> Tickers { get; private set; }
private void LoadTrades()
{
tickerRepository.GetTickerStream()
.ObserveOn(concurrencyService.Dispatcher)
.SubscribeOn(concurrencyService.TaskPool)
.Subscribe(
AddTicker,
ex => log.Error("An error occurred within the trade stream", ex));
}
private void AddTicker(Ticker ticker)
{
Tickers.Single(x => x.Name == ticker.Name)
.AcceptNewPrice(ticker.Price);
}
}
}
Where each Ticker
is represented by a single TickerViewModel
which is as shown below. It can be seen that this class also makes use of the ConnectionStatusStream IObservable
we discussed earlier. This is used to change the TickerViewModel
s view to show a red box with "DISCONNECTED" in it. We will talk through that in just a minute.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Client.Services;
using log4net;
namespace Client.ViewModels
{
public class TickerViewModel : INPCBase
{
private decimal price;
private bool isUp;
private bool stale;
private bool disconnected;
private static readonly ILog log = LogManager.GetLogger(typeof(TickerViewModel));
public TickerViewModel(
IReactiveTrader reactiveTrader,
IConcurrencyService concurrencyService,
string name)
{
this.Name = name;
reactiveTrader.ConnectionStatusStream
.ObserveOn(concurrencyService.Dispatcher)
.SubscribeOn(concurrencyService.TaskPool)
.Subscribe(
OnStatusChange,
ex => log.Error("An error occurred within the connection status stream.", ex));
}
public string Name { get; private set; }
public void AcceptNewPrice(decimal newPrice)
{
IsUp = newPrice > price;
Price = newPrice;
}
public decimal Price
{
get { return this.price; }
private set
{
this.price = value;
base.OnPropertyChanged("Price");
}
}
public bool IsUp
{
get { return this.isUp; }
private set
{
this.isUp = value;
base.OnPropertyChanged("IsUp");
}
}
public bool Stale
{
get { return this.stale; }
set
{
this.stale = value;
base.OnPropertyChanged("Stale");
}
}
public bool Disconnected
{
get { return this.disconnected; }
set
{
this.disconnected = value;
base.OnPropertyChanged("Disconnected");
}
}
private void OnStatusChange(ConnectionInfo connectionInfo)
{
switch (connectionInfo.ConnectionStatus)
{
case ConnectionStatus.Connecting:
Disconnected = true;
break;
case ConnectionStatus.Connected:
Disconnected = false;
break;
case ConnectionStatus.Closed:
Disconnected = true;
break;
default:
throw new ArgumentOutOfRangeException();
}
}
}
}
It can be seen that this ViewModel makes use of the IReactiveTrader.ConnectionStatusStream
to monitor the status of the connection to the NetMQPublisher
. It is this code that is responsible for changing the appearance of the tile from one that shows ticking prices to a big red "DISCONNECTED" tile. Which is done by the use of the Disconnected
property
The last thing I wanted to show you was how the ConnectionStatusStream
was used. This stream OnNext
s when the client side HeartBeatClient
pushes out new values. So we see thing like Connecting, Connected,Closed
. All of which are created using the logic we looked at earlier within the NetMQHeartBeatClient
, and are turned in to an IObservable
stream using a standard RX Subject<T>
Anyway here is the overall ConnectivityStatusViewModel
that we use to show the information in the bottom status bar of the app.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Client.Services;
using log4net;
namespace Client.ViewModels
{
public class ConnectivityStatusViewModel : INPCBase
{
private static readonly ILog log = LogManager.GetLogger(typeof(ConnectivityStatusViewModel));
private string server;
private string status;
private bool disconnected;
public ConnectivityStatusViewModel(
IReactiveTrader reactiveTrader,
IConcurrencyService concurrencyService)
{
reactiveTrader.ConnectionStatusStream
.ObserveOn(concurrencyService.Dispatcher)
.SubscribeOn(concurrencyService.TaskPool)
.Subscribe(
OnStatusChange,
ex => log.Error("An error occurred within the connection status stream.", ex));
}
private void OnStatusChange(ConnectionInfo connectionInfo)
{
Server = connectionInfo.Server;
switch (connectionInfo.ConnectionStatus)
{
case ConnectionStatus.Connecting:
Status = "Connecting...";
Disconnected = true;
break;
case ConnectionStatus.Connected:
Status = "Connected";
Disconnected = false;
break;
case ConnectionStatus.Closed:
Status = "Disconnected";
Disconnected = true;
break;
default:
throw new ArgumentOutOfRangeException();
}
}
public string Server
{
get { return this.server; }
set
{
this.server = value;
base.OnPropertyChanged("Server");
}
}
public string Status
{
get { return this.status; }
set
{
this.status = value;
base.OnPropertyChanged("Status");
}
}
public bool Disconnected
{
get { return this.disconnected; }
set
{
this.disconnected = value;
base.OnPropertyChanged("Disconnected");
}
}
}
}
Anyway that is all I wanted to say for now, I hope you have enjoyed this very mini RX/SignalR/NetMQ series, and that this has maybe made you want to go away and have a play with RX/SignalR/NetMQ. If you have enjoyed it, please feel to free to leave a vote or a comment, Doron and I have worked quite hard on these demos, to try and iron out any bugs and make them as real world as possible, so comments/votes are always nice to receive.