Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

NetMQ+ RX (Streaming Data Demo App 2 of 2)

0.00/5 (No votes)
14 Dec 2014 1  
Small demo app showing how to create a streaming Publisher / Resilient clients using NetMQ + RX

 

Where Is The Code?

You can grab the code from my github repository right here

https://github.com/sachabarber/NetMQRxDemo

 

Introduction

This is the 2nd part of this series of articles. If you missed part one this is main points from the introduction section from the last article.

I work in finance, and at the moment work in the Foreign Exchange (FX) space, where we have lots of requirements that involve streaming of various things, such as:

  • Rates
  • Trades
  • Other financial instruments

Historically what we would have done to accomodate these requirements was to use sockets and push our own data (usually JSON or XML) down the wire, and then had some quite knarly listeners above that, where each listener would want some slightly different view of the original streaming data.

Thing is, time has never stands still in the world of software development, and new things come out all the time, literally every other day a new framework comes along that helps improve what came before it in some way or another.

One particular thing, that I think a lot of people actually misunderstood was Reactive Extensions RX, which in fairness has been around a while now. I think a lot of people think of RX as LINQ to events, which it does have. However the truth is that RX is a superb framework for creating streaming applications and the more you get into RX the more you tend to see everything as a stream, from and event, down to picking a file or a ICommand.Execute in a ViewModel say.

RX comes with many many tools in it's arsenal, such as

  • 2 core interfaces IObservable / IObserver
  • Lots of LINQ like extension methods for IObservable
  • Time / window based operations (LINQ doesn't really have these)
  • Concurrency schedulers
  • Stream error handling controls (Catch etc etc)
  • Ability to create IObservable from a great many other things, such as
    • IEnumerable
    • Task
    • Events
    • IObservable Factories

I personally feel that a lot of people could make use of RX to great gain, if they were to put in the time to learn about it, a bit more.

Last time we talked about SignalR working with RX. Thing is, sometimes people will want to use sockets for one reason or another. Now I like sockets, what I am not a fan off is how much boiler plate code one has to do when using standard .NET socket types. Luckily you don't have to, as there is this totally awesome project called ZeroMQ, which has a .NET port by way of NetMQ, which I am actually pretty pleased to say I had a very small (minute) hand in. I wrote the Actor part of NetMQ. When I first started disecting the SignalR demo (which if you have read the 1st article you will know was based on the great work by Lee Campbell (Intro To RX author and all round top chap, and his collegues at Adaptive)), I contacted the author of NetMQ, and asked if he would be willing to give me a hand with a NetMQ port of that code, and he kindly agreed, you should see him here as my co-author, thanks Doron, you rock.

So in this article Doron and I outline what you would need to do to create a similiar app using NetMQ and RX.

 

Article Series

  1. Streaming API using SignalR and RX
  2. Streaming API using NetMQ and RX (this article)

 

Video Showing You What We Will Learn About

I think the best way to see what this demo is about is to have a look at the video, which I have uploaded to YouTube here:

https://www.youtube.com/watch?v=uTqCA1cN16k

 

Small RX Primer

Now I covered this in the 1st article, but just in case some of you missed that I think it is important ground to cover again, so here we go:

There are many many RX Extension methods which you can make use of:

http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable_methods(v=vs.103).aspx

But perhaps the best place to start with RX, is to understand the underlying 2 interfaces you will use time and time again. These are IObservable and IObserver. We discuss these 2 next.

 

IObservable

namespace System
{
    // Summary:
    //     Defines a provider for push-based notification.
    //
    // Type parameters:
    //   T:
    //     The object that provides notification information.This type parameter is
    //     covariant. That is, you can use either the type you specified or any type
    //     that is more derived. For more information about covariance and contravariance,
    //     see Covariance and Contravariance in Generics.
    public interface IObservable<out T>
    {
        // Summary:
        //     Notifies the provider that an observer is to receive notifications.
        //
        // Parameters:
        //   observer:
        //     The object that is to receive notifications.
        //
        // Returns:
        //     A reference to an interface that allows observers to stop receiving notifications
        //     before the provider has finished sending them.
        IDisposable Subscribe(IObserver<T> observer);
    }
}

Defines a provider for push-based notification.

This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.

  • Subscribe Notifies the provider that an observer is to receive notifications.

Further reading : http://msdn.microsoft.com/en-us/library/dd990377%28v=vs.110%29.aspx

 

IObserver

namespace System
{
    // Summary:
    //     Provides a mechanism for receiving push-based notifications.
    //
    // Type parameters:
    //   T:
    //     The object that provides notification information.This type parameter is
    //     contravariant. That is, you can use either the type you specified or any
    //     type that is less derived. For more information about covariance and contravariance,
    //     see Covariance and Contravariance in Generics.
    public interface IObserver<in T>
    {
        // Summary:
        //     Notifies the observer that the provider has finished sending push-based notifications.
        void OnCompleted();
        //
        // Summary:
        //     Notifies the observer that the provider has experienced an error condition.
        //
        // Parameters:
        //   error:
        //     An object that provides additional information about the error.
        void OnError(Exception error);
        //
        // Summary:
        //     Provides the observer with new data.
        //
        // Parameters:
        //   value:
        //     The current notification information.
        void OnNext(T value);
    }
}

The IObserver<T> and IObservable<T> interfaces provide a generalized mechanism for push-based notification, also known as the observer design pattern. The IObservable<T> interface represents the class that sends notifications (the provider); the IObserver<T> interface represents the class that receives them (the observer). T represents the class that provides the notification information.


An IObserver<T> implementation arranges to receive notifications from a provider (an IObservable<T> implementation) by passing an instance of itself to the provider's IObservable<T>.Subscribe method. This method returns an IDisposable object that can be used to unsubscribe the observer before the provider finishes sending notifications.
The IObserver<T> interface defines the following three methods that the observer must implement:

  • The OnNext method, which is typically called by the provider to supply the observer with new data or state information.
  • The OnError method, which is typically called by the provider to indicate that data is unavailable, inaccessible, or corrupted, or that the provider has experienced some other error condition.
  • The OnCompleted method, which is typically called by the provider to indicate that it has finished sending notifications to observers.

Further reading : http://msdn.microsoft.com/en-us/library/dd783449%28v=vs.110%29.aspx

 

Observable.Create

Anyone that has used RX a bit, will come to the point that they may like to create their own operators. In LINQ this would be done by creating a new method (perhaps an extension method) that simple returns a new IEnumerable<T>. In RX things are slightly trickier but not much trickier. In RX you would make use of Observable.Create to create your own operators, or you could also use  Observable.Create as a factory for creating new a Observable<T>.  Here is a simple example:

private IObservable<string> CreateObservable()
{
  return Observable.Create<string>(
    (IObserver<string> observer) =>
    {
      observer.OnNext("a");
      observer.OnNext("b");
      observer.OnCompleted();
      return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
    });
}

 

Observable.Create creates an observable sequence from a subscribe method implementation. You would typically use an extension method to an existing IObservable<T> and then put an Observable.Create(..) as the body of the extension method.

Observable.Create has this method signature

public static IObservable<TSource> Create<TSource>(
	Func<IObserver<TSource>, IDisposable> subscribe
)

The interesting part of this method signature is that you can see that it returns a Func delegate hat takes a IObserver<TSource> which allows you to push values into the observer using its OnNext/OnError and OnCompleted methods. The Func delegate returns a IDisposable, which will called when disposal happens. Now RX comes with a great many different IDisposable types, don't be suprised if you see some quite exotic IDisposable types being used inside of an Observable.Create implementation.

One of the best descriptions of using Observable.Create is from Lee Campbells site :

http://www.introtorx.com/content/v1.0.10621.0/04_CreatingObservableSequences.html

 

 

General Idea

This diagram illustrates the general idea:

 

The Publisher

 

The publisher is made up of a few bits and pieces, which are described below, one thing to note is that both the client and the server make use of NetMQ, so it may be an idea to read a little bit about NetMQ before you start. Somdoron (NetMQ author) has a few posts on NetMQ on his blog, and there are a few more posts here and there, oh I myself have done a few, which are shown below. During the course of writing this article, Somdoron asked me if I wanted to get more involved with NetMQ, and I do, so after XMAS, I have agreed to help out a lot more with NetMQ, the first step of that will be to properly document it, which is one area where it does (to be fair) need a bit more of a push.

 

IOC

There is some IOC used, AutoFac is the container used. As IOC is not the main point of the article, I will not waste any more time on it. If you want to know more about the IOC/AutoFac read the code, looking into the BootStrapper class

MainWindow

There is not much to say about this window, except that is has the following controls

  • Start NetMQ : Which starts the NetMQ NetMQPublisher Actor PublisherSocket (useful for simulating crash/restart of NetMQ server)
  • Stop NetMQ : Which disposes the NetMQ NetMQPublisher Actor (useful for simulating crash/restart of NetMQ server)
  • Start Auto Ticker Data : Will push out random TickerDto objects out of the NetMQ NetMQPublisher Actor at fixed intervals
  • Stop Auto Ticker Data : Will suspend the sending of TickerDto objects out of the NetMQ NetMQPublisher Actor
  • Send One Ticker : Will push 1 SINGLE random TickerDto object out of the NetMQ NetMQPublisher Actor

Don't be too worried if this term "Actor" doesn't mean anything to you yet, we will get on to that soon.

 

MainWindowViewModel

The MainWindowViewModel is really used to facilitate sending commands to either stop/create the NetMQ NetMQPublisher Actor, or to tell the NetMQ NetMQPublisher Actor to do things. Here is the full code for the MainWindowViewModel

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using System.Windows.Input;
using Common.ViewModels;
using log4net;
using NetMQServer.Ticker;

namespace NetMQServer
{
    public class MainWindowViewModel
    {
        private readonly ITickerPublisher tickerPublisher;
        private readonly ITickerRepository tickerRepository;
        private Random rand;
        private static readonly ILog Log = LogManager.GetLogger(typeof(MainWindowViewModel));
        private CancellationTokenSource autoRunningCancellationToken;
        private Task autoRunningTask;
        private bool serverStarted;
        private bool autoTickerStarted;

        public MainWindowViewModel(ITickerPublisher tickerPublisher, ITickerRepository tickerRepository)
        {
            this.tickerPublisher = tickerPublisher;
            this.tickerRepository = tickerRepository;
            this.rand = new Random();

            serverStarted = false;
            autoTickerStarted = false;

            AutoTickerStartCommand = new DelegateCommand(
                AutoRunning, 
                () => serverStarted && !autoTickerStarted);
            
            AutoTickerStopCommand = new DelegateCommand(
                () =>
                {
                    if (autoRunningCancellationToken != null)
                    {
                        autoRunningCancellationToken.Cancel();
                        autoRunningTask.Wait();
                        autoTickerStarted = false;
                    }
                }, 
                () => serverStarted && autoTickerStarted);

            SendOneTickerCommand = new DelegateCommand(
                SendOneManualFakeTicker, 
                () => serverStarted && !autoTickerStarted);

            StartCommand = new DelegateCommand(
                StartServer, 
                () => !serverStarted);
            
            StopCommand = new DelegateCommand(
                StopServer, 
                () => serverStarted);
        }

        public DelegateCommand AutoTickerStartCommand { get; set; }
        public DelegateCommand AutoTickerStopCommand { get; set; }
        public DelegateCommand SendOneTickerCommand { get; set; }
        public DelegateCommand StartCommand { get; set; }
        public DelegateCommand StopCommand { get; set; }

        public void Start()
        {
            StartServer();
        }


        private void AutoRunning()
        {
            autoTickerStarted = true;
            autoRunningCancellationToken = new CancellationTokenSource();
            autoRunningTask = Task.Run(async () =>
            {
                //Publisher is not thread safe, so while the auto ticker is 
                //running only the autoticker is allowed to access the publisher
                while (!autoRunningCancellationToken.IsCancellationRequested)
                {
                    SendOneManualFakeTicker();

                    await Task.Delay(20);
                }
            });
        }

        private void SendOneManualFakeTicker()
        {
            var currentTicker = tickerRepository.GetNextTicker();

            var flipPoint = rand.Next(0, 100);

            if (flipPoint > 50)
            {
                currentTicker.Price += currentTicker.Price / 30;
            }
            else
            {
                currentTicker.Price -= currentTicker.Price / 30;
            }

            tickerRepository.StoreTicker(currentTicker);

            tickerPublisher.PublishTrade(currentTicker);
        }

        private void StartServer()
        {
            serverStarted = true;
            tickerPublisher.Start();
            AutoRunning();
        }

        private void StopServer()
        {
            if (autoTickerStarted)
            {
                autoRunningCancellationToken.Cancel();

                // Publisher is not thread safe, so while the auto ticker is 
                // running only the autoticker is allowed to access the publisher. 
                //Therefore before we can stop the publisher we have to 
                // wait for the autoticker task to complete
                autoRunningTask.Wait();
                autoTickerStarted = false;

                autoRunningCancellationToken = null;
                autoRunningTask = null;
            }
            tickerPublisher.Stop();

            serverStarted = false;
        }
    }
}

As you can see not much really goes on in the MainWindowViewModel, all the action happens elsewhere.

 

NetMQ Actors

Before we get into the nitty gritty of how the NetMQPublisher works, it is worth noting one thing. Which is that the server and the client use an Actor model for NetMQ, which I am actually very pleased to see being used, as it was me that wrote that part, and it was that peice that introduced me to Somdoron.

You can essentially think of the Actor as a socket that you can send messages (commands) to. Internally the Actor uses a special PairSocket, where you are writing to one half of the pair when you send messages (commands) to the Actor, and internally the Actor is receiving from on the other end of the PairSocket, where it will carry out some work in response to the commands it receieves of the PairSocket.

What you really do is work out a simple protocol that you will use between the source of the command and the Actor and stick to that protocol. This protocol may include commands to tell the Actor to do things, or it may be a command to signal the Actor to stop doing anything (NetMQ has a special command ActorKnownMessages.END_PIPE, for this case since it is such a common requirement)

Now you may be wondering why we chose to use this Actor model at all? Well it is quite simple, using the Actor model ensures there is never any contention for locks as there simply is no shared data at all, as the data is sent over a socket, so the Actor is guarenteed to have its own copy of the data, and as such no locking is needed, and as such there is no waiting for locks, which helps keeps things nice and fast and thread safe.

You can read a quite in depth blog post I did on this here : #7 : Simple Actor Model

 

Starting The Publisher

As we saw above the MainWindowViewModel contains commands to Start/Stop the NetMQPublisher. But what do they do, how does that start/stop the NetMQPublisher?

In the case of the Start what happens if actually quite simple, internally the NetMQPublisher will create a new Actor that is then ready to receive commands.

public void Start()
{
    actor = new Actor<object>(context, new ShimHandler(context, tickerRepository), null);
}

 

Who Does The Streaming To The Clients?

That is all done by the NetMQPublisher which we just discussed a bit above. Here is the code for the NetMQPublisher:

The NetMQPublisher: actually does a few things:

  1. Sends out a one time snapshot in response (ResponseSocket) to the a client initiated request (RequestSocket)
  2. Publishes (PublisherSocket) "Trades" out to all connected clients that have subscribed (SubscriberSocket) for the "Trades"
  3. Publishes (PublisherSocket)  heartbeat "HB" out to all connected clients that have subscribed (SubscriberSocket) for the "HB"

 

using System;
using System.Collections.Generic;using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Navigation;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;

namespace NetMQServer.Ticker
{
    public class NetMQPublisher : ITickerPublisher
    {
        private const string PublishTicker = "P";        

        public class ShimHandler : IShimHandler<object>
        {
            private readonly NetMQContext context;
            private PublisherSocket publisherSocket;
            private ResponseSocket snapshotSocket;
            private ITickerRepository tickerRepository;
            private Poller poller;
            private NetMQTimer heartbeatTimer;

            public ShimHandler(NetMQContext context, ITickerRepository tickerRepository)
            {
                this.context = context;
                this.tickerRepository = tickerRepository;                
            }

            public void Initialise(object state)
            {

            }

            public void RunPipeline(PairSocket shim)
            {
                publisherSocket = context.CreatePublisherSocket();
                publisherSocket.Bind("tcp://*:" + StreamingProtocol.Port);

                snapshotSocket = context.CreateResponseSocket();
                snapshotSocket.Bind("tcp://*:" + SnapshotProtocol.Port);
                snapshotSocket.ReceiveReady += OnSnapshotReady;
                
                shim.ReceiveReady += OnShimReady;

                heartbeatTimer = new NetMQTimer(StreamingProtocol.HeartbeatInterval);
                heartbeatTimer.Elapsed += OnHeartbeatTimerElapsed;

                shim.SignalOK();

                poller = new Poller();
                poller.AddSocket(shim);
                poller.AddSocket(snapshotSocket);
                poller.AddTimer(heartbeatTimer);
                poller.Start();

                publisherSocket.Dispose();
                snapshotSocket.Dispose();
            }

            private void OnHeartbeatTimerElapsed(object sender, NetMQTimerEventArgs e)
            {
                publisherSocket.Send(StreamingProtocol.HeartbeatTopic);
            }

            private void OnSnapshotReady(object sender, NetMQSocketEventArgs e)
            {                
                string command = snapshotSocket.ReceiveString();

                // Currently we only have one type of events
                if (command == SnapshotProtocol.GetTradessCommand)
                {
                    var tickers = tickerRepository.GetAllTickers();

                    // we will send all the tickers in one message
                    foreach (var ticker in tickers)
                    {
                        snapshotSocket.SendMore(JsonConvert.SerializeObject(ticker));
                    }

                    snapshotSocket.Send(SnapshotProtocol.EndOfTickers);
                }
            }

            private void OnShimReady(object sender, NetMQSocketEventArgs e)
            {
   
                string command = e.Socket.ReceiveString();

                switch (command)
                {
                    case ActorKnownMessages.END_PIPE:
                        poller.Stop(false);
                        break;
                    case PublishTicker:
                        string topic = e.Socket.ReceiveString();
                        string json = e.Socket.ReceiveString();
                        publisherSocket.
                            SendMore(topic).
                            Send(json);
                        break;
                }

            }
        }

        private Actor<object> actor;
        private readonly NetMQContext context;
        private readonly ITickerRepository tickerRepository;
                    
        public NetMQPublisher(NetMQContext context, ITickerRepository tickerRepository)
        {
            this.context = context;
            this.tickerRepository = tickerRepository;        
        }

        public void Start()
        {
            if (actor != null)
                return;

            actor = new Actor<object>(context, new ShimHandler(context, tickerRepository), null);
        }

        public void Stop()
        {
            if (actor != null)
            {
                actor.Dispose();
                actor = null;
            }
        }        

        public void PublishTrade(TickerDto ticker)
        {
            if (actor == null)
                return;

            actor.
                SendMore(PublishTicker).
                SendMore(StreamingProtocol.TradesTopic).
                Send(JsonConvert.SerializeObject(ticker));                
        }

    }
}
}

What happens internally within the Actor pipeline of the NetMQPublisher is that a couple of extra sockets are created.

These are NOT the sockets the Actor uses internally, those are a dedicated PairSocket pair that you will not really see, they are part of the NetMQ codebase. The sockets we are talking about here are the ones that are used for the application logic, which is this demo application example are as follows:

  1. A PublisherSocket : This socket is used to publish out to clients. The way that NetMQ works is by using message frame(s) such that the 1st frame may be used as a topic say, and the next message frame may be the actual payload. That way the client(s) (SubscriberSocket(s)) are able to see if the message is one they are interested in, before they go through the work of dealing with the message payload. This one PublisherSocket should be enough to serve many different topics, you would simply provide the following things to the publisher
    • Message topic
    • Message payload
    An example of sending a specific message through the NetMQPublisher is as follows:
    	public void PublishTrade(TickerDto ticker)
    	{
    	    actor.
    	        SendMore(PublishTicker).
    	        SendMore(StreamingProtocol.TradesTopic).
    	        Send(JsonConvert.SerializeObject(ticker));                
    	}
    
    There are actually 2 topics being used in the demo app which are as follows:
    •  TradesTopic ("Trades") : This uses the single NetMQPublisher held PublisherSocket to stream TickerDto objects at the connected clients. On the client side they use a SubscriberSocket where the topic is set to "Trades" such that they will only receive items from the publisher that have come from the NetMQPublisher PublisherSocket which have a topic matching "Trades"
    • HeartbeatTopic ("HB") : This also uses the single NetMQPublisher held PublisherSocket to stream a single message frame containing just the topic name "HB" (the message content is not important, just the topic name, such that client can see a new "HB" topic message) to all connected clients. On the client side  a SubscriberSocket is used where the topic is set to "HB" such that they will only receive items from the publisher that have come from the NetMQPublisher PublisherSocket which have a topic matching "HB". So what happens is that the server will initiate the publishing of a message, and the client contains a subscriber for the heartbeat topic and in the clients subscriber code this arrangement works as follows:
      • If the server (NetMQPublisher) responds in a timely manner, the communication between the client and the server is considered ok.
      • If the server (NetMQPublisher) DOES NOT respond in a timely manner, the communication between the client and the server is considered broken/disconnected/bad/fubar/wrecked/ruined/bogus.
  2. A ResponseSocket : This socket is used for the sending of a snapshot of all trades that are stored in the in memory publisher process held TickerRespository (which is really nothing more than a queue of the last x many trades) between the clients and the server. The client(s) would contain a RequestSocket, whilst the server (the NetMQPublisher) contains the ResponseSocket. So what happens is that the client will initiate the sending of a message (the message content itself is not important), and shall expect a response which would be the current x many trades serialized as JSON from the server side TickerRespository. This is done as a one of within the client at startup

 

 

 

Simulating A Crash In The Publisher

This bit is easy, just use the "Stop NetMQ" button in the MainWindow. This will simply run the following code, which disposes the Actor in the NetMQPublisher class

public void Stop()
{
    actor.Dispose();
}   

What should happen, is that if you had any connected clients they should (after a short period) see the server as unavailable and should display "DISCONNECTED" tiles

 

Restarting The Publisher

This bit is easy, just use the "Start NetMQ" button in the MainWindow. This will simply run the following code

private void StartServer()
{
    serverStarted = true;
    tickerPublisher.Start();
    AutoRunning();
}

What should happen is that if you had any connected clients they should (after a short period) see the server as available again and no longer display the "DISCONNECTED" tiles

 

 

 

NetMQ Client

A client is a standard WPF app (I am using WPF just because I know it, it is not really that important, you could use Winforms using the MVP pattern just as well). The client also makes use of NetMQ, and as such a good starting place would be to examine what NetMQ has to offer

Here is screen shot of a couple of clients running at the same time, which is something I did not show in the 1st article

And this is what they look like after we click the "Stop NetMQ" button on the server

 

IOC

There is some IOC used, AutoFac is the container used. As IOC is not the main point of the article, I will not waste any more time on it. If you want to know more about the IOC/AutoFac read the code, looking into the BootStrapper class

Clients

The way we decided to structure things was to have a Rx'y type client which exposes a IObservable stream which the rest of the app can make use of. Within that IObservable we would use the power of Observable.Create(..) to create a factory that would create the IObservable stream for that scenario by calling into/creating the NetMQ classes required to forfill the stream requirements.

So the gerneral pattern in this demo would be something like

XXXXClient will be the thing that has the IObservable stream that the rest of the app uses, which will internally use a XXXXNetMQClient to do this nitty gritty comms to the NetMQ server.

 

TickerClient / NetMQTickerClient

We took the decision that for each non "heartbeat (HB)" topic we would have a dedicated client between the client and the server. This is kind of similiar to what we had in part 1 where there was a dedicated client proxy per Hub type.

NetMQTickerClient

The NetMQTickerClient is where all the client side NetMQ shennanigans is. It is within the NetMQTickerClient that the client will use the NetMQ SubscriberSocket to subscribe to the "Trades" topic. As before we will make use of the Actor framework within NetMQ.  The NetMQTickerClient also does the initial snapshot with the server using a RequestSocket, where the NetMQ server will have a ResponseSocket. The server will send the initial snapshot of TickerDtos that are then made available on the ticker stream for the app to use.

The NetMQTickerClient also does a bit of error handling OnError'ing, but the main way to detect failure issues is by using the HeartBeatClient.

Here is the  NetMQTickerClient code:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Client.Factory;
using Client.Comms.Transport;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;

namespace Client.Comms
{
    public class NetMQTickerClient : IDisposable
    {
        private Actor<object> actor;
        private Subject<TickerDto> subject;
        private CompositeDisposable disposables = new CompositeDisposable();

        class ShimHandler : IShimHandler<object>
        {
            private NetMQContext context;
            private SubscriberSocket subscriberSocket;
            private Subject<TickerDto> subject;
            private string address;
            private Poller poller;
            private NetMQTimer timeoutTimer;

            public ShimHandler(NetMQContext context, Subject<TickerDto> subject, string address)
            {
                this.context = context;
                this.address = address;
                this.subject = subject;
            }

            public void Initialise(object state)
            {

            }

            public void RunPipeline(PairSocket shim)
            {
                // we should signal before running the poller but this will block the application
                shim.SignalOK();

                this.poller = new Poller();

                shim.ReceiveReady += OnShimReady;
                poller.AddSocket(shim);

                timeoutTimer = new NetMQTimer(StreamingProtocol.Timeout);
                timeoutTimer.Elapsed += TimeoutElapsed;
                poller.AddTimer(timeoutTimer);

                Connect();

                poller.Start();

                if (subscriberSocket != null)
                {
                    subscriberSocket.Dispose();
                }
            }

            private void Connect()
            {
                // getting the snapshot
                using (RequestSocket requestSocket = context.CreateRequestSocket())
                {

                    requestSocket.Connect(string.Format("tcp://{0}:{1}", address, SnapshotProtocol.Port));

                    requestSocket.Send(SnapshotProtocol.GetTradessCommand);

                    string json;

                    requestSocket.Options.ReceiveTimeout = SnapshotProtocol.RequestTimeout;

                    try
                    {
                        json = requestSocket.ReceiveString();
                    }
                    catch (AgainException ex)
                    {
                        // Fail to receive trades, we call on error and don't try to do anything with subscriber
                        // calling on error from poller thread block the application
                        Task.Run(() => subject.OnError(new Exception("No response from server")));
                        return;
                    }

                    while (json != SnapshotProtocol.EndOfTickers)
                    {
                        PublishTicker(json);

                        json = requestSocket.ReceiveString();
                    }
                }

                subscriberSocket = context.CreateSubscriberSocket();
                subscriberSocket.Subscribe(StreamingProtocol.TradesTopic);
                subscriberSocket.Subscribe(StreamingProtocol.HeartbeatTopic);
                subscriberSocket.Connect(string.Format("tcp://{0}:{1}", address, StreamingProtocol.Port));
                subscriberSocket.ReceiveReady += OnSubscriberReady;

                poller.AddSocket(subscriberSocket);

                // reset timeout timer
                timeoutTimer.Enable = false;
                timeoutTimer.Enable = true;
            }

            private void TimeoutElapsed(object sender, NetMQTimerEventArgs e)
            {
                // no need to reconnect, the client would be recreated because of RX

                // because of RX internal stuff invoking on the poller thread block the entire application, so calling on Thread Pool
                Task.Run(() => subject.OnError(new Exception("Disconnected from server")));
            }

            private void OnShimReady(object sender, NetMQSocketEventArgs e)
            {
                string command = e.Socket.ReceiveString();

                if (command == ActorKnownMessages.END_PIPE)
                {
                    poller.Stop(false);
                }
            }

            private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
            {
                string topic = subscriberSocket.ReceiveString();

                if (topic == StreamingProtocol.TradesTopic)
                {
                    string json = subscriberSocket.ReceiveString();
                    PublishTicker(json);

                    // reset timeout timer also when a quote is received
                    timeoutTimer.Enable = false;
                    timeoutTimer.Enable = true;
                }
                else if (topic == StreamingProtocol.HeartbeatTopic)
                {
                    // reset timeout timer
                    timeoutTimer.Enable = false;
                    timeoutTimer.Enable = true;
                }
            }

            private void PublishTicker(string json)
            {
                TickerDto tickerDto = JsonConvert.DeserializeObject<TickerDto>(json);
                subject.OnNext(tickerDto);
            }
        }

        public NetMQTickerClient(NetMQContext context, string address)
        {
            subject = new Subject<TickerDto>();

            this.actor = new Actor<object>(context, new ShimHandler(context, subject, address), null);
            this.disposables.Add(this.actor);

            this.disposables.Add(NetMQHeartBeatClient.Instance.GetConnectionStatusStream()
                .Where(x => x.ConnectionStatus == ConnectionStatus.Closed)
                .Subscribe(x =>
                    this.subject.OnError(new InvalidOperationException("Connection to server has been lost"))));
        }

        public IObservable<TickerDto> GetTickerStream()
        {
            return subject.AsObservable();
        }

        public void Dispose()
        {
            this.disposables.Dispose();
        }
    }
}

TickerClient

The TickerClient can be used to throughout the app to stream TickerDto objects, where it simply wraps another stream from the  NetMQTickerClient. The important part being that when an error occurs and the Repeat that is inside the TickerRepository kicks in, that the TickerClient IObservable subscription will  recreate the NetMQHeartBeatClient. Which will ensure that the NetMQHeartBeatClient will attempt to communicate with the server again. As before it all comes down to good housekeeping and lifestyle management.

Here is the code for the TickerClient

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Common;
using NetMQ;

namespace Client.Comms
{
    public class TickerClient : ITickerClient
    {
        private readonly NetMQContext context;
        private readonly string address;

        public TickerClient(NetMQContext context, string address)
        {
            this.context = context;
            this.address = address;
        }

        public IObservable<TickerDto> GetTickerStream()
        {
            return Observable.Create<TickerDto>(observer =>
            {
                NetMQTickerClient client = new NetMQTickerClient(context, address);
               
                var disposable = client.GetTickerStream().Subscribe(observer);
                return new CompositeDisposable { client, disposable };
            })
            .Publish()
            .RefCount();
        }


        public IObservable<ConnectionInfo> ConnectionStatusStream()
        {
            return Observable.Create<ConnectionInfo>(observer =>
            {
                NetMQHeartBeatClient.Instance.InitialiseComms();

                var disposable = NetMQHeartBeatClient.Instance.
                    GetConnectionStatusStream().Subscribe(observer);

                return new CompositeDisposable { disposable };
            })
            .Publish()
            .RefCount();
        }
    }
}

 

HeartBeatClient / NetMQHeartBeatClient

We took the decision that the heartbeat between a single client and the server is a global concern in the context of that client.

As such there is only ever expected to be a single HeartBeatClient (this is achieved through IOC registration), and there is only ever one instance (a hard coded designed) singleton of the NetMQHeartBeatClient

NetMQHeartBeatClient

The NetMQHeartBeatClient is where all the client side NetMQ shennanigans is. It is within the NetMQHeartBeatClient that the client will use the NetMQ SubscriberSocket to subscribe to the "HeartBeat (HB)" topic. As before we will make use of the Actor framework within NetMQ. This is also where we would expect a response back from the server side PublisherSocket within x amount of time. If we do not get that response we consider the comms to be broken, and we use an internal RX Subject<T> to OnNext the relevant ConnectionInfo/ConnectionStatus.

Here is the  NetMQHeartBeatClient code:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Client.Factory;
using Client.Comms.Transport;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;

namespace Client.Comms
{
    public class NetMQHeartBeatClient 
    {
        private readonly NetMQContext context;
        private readonly string address;
        private Actor<object> actor;
        private Subject<ConnectionInfo> subject;
        private static NetMQHeartBeatClient instance = null;
        private static object syncLock = new object();
        protected int requiresInitialisation = 1;

        class ShimHandler : IShimHandler<object>
        {
            private NetMQContext context;
            private SubscriberSocket subscriberSocket;
            private Subject<ConnectionInfo> subject;
            private string address;
            private Poller poller;
            private NetMQTimer timeoutTimer;
            private NetMQHeartBeatClient parent;

            public ShimHandler(NetMQContext context, Subject<ConnectionInfo> subject, string address)
            {
                this.context = context;
                this.address = address;
                this.subject = subject;
            }

            public void Initialise(object state)
            {
                parent = (NetMQHeartBeatClient) state;
            }

            public void RunPipeline(PairSocket shim)
            {
                // we should signal before running the poller but this will block the application
                shim.SignalOK();

                this.poller = new Poller();

                shim.ReceiveReady += OnShimReady;
                poller.AddSocket(shim);

                timeoutTimer = new NetMQTimer(StreamingProtocol.Timeout);
                timeoutTimer.Elapsed += TimeoutElapsed;
                poller.AddTimer(timeoutTimer);

                Connect();

                poller.Start();

                if (subscriberSocket != null)
                {
                    subscriberSocket.Dispose();
                }                
            }

            private void Connect()
            {
                subscriberSocket = context.CreateSubscriberSocket();
                subscriberSocket.Subscribe(StreamingProtocol.HeartbeatTopic);
                subscriberSocket.Connect(string.Format("tcp://{0}:{1}", address, StreamingProtocol.Port));

                subject.OnNext(new ConnectionInfo(ConnectionStatus.Connecting, this.address));
                subscriberSocket.ReceiveReady += OnSubscriberReady;
                poller.AddSocket(subscriberSocket);

                // reset timeout timer
                timeoutTimer.Enable = false;
                timeoutTimer.Enable = true;
            }

            private void TimeoutElapsed(object sender, NetMQTimerEventArgs e)
            {
                // no need to reconnect, the client would be recreated because of RX
                
                // because of RX internal stuff invoking on the poller thread block 
                // the entire application, so calling on Thread Pool
                Task.Run(() =>
                {
                    parent.requiresInitialisation = 1;
                    subject.OnNext(new ConnectionInfo(ConnectionStatus.Closed, this.address));
                });
            }

            private void OnShimReady(object sender, NetMQSocketEventArgs e)
            {
                string command = e.Socket.ReceiveString();

                if (command == ActorKnownMessages.END_PIPE)
                {
                    poller.Stop(false);
                }
            }

            private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
            {
                string topic = subscriberSocket.ReceiveString();

                if (topic == StreamingProtocol.HeartbeatTopic)
                {
                    subject.OnNext(new ConnectionInfo(ConnectionStatus.Connected, this.address));

                    // reset timeout timer
                    timeoutTimer.Enable = false;
                    timeoutTimer.Enable = true;
                }
            }
        }

        private NetMQHeartBeatClient(NetMQContext context, string address)
        {
            this.context = context;
            this.address = address;
            InitialiseComms();
        }

        public static NetMQHeartBeatClient CreateInstance(NetMQContext context, string address)
        {
            if (instance == null)
            {
                lock (syncLock)
                {
                    if (instance == null)
                    {
                        instance = new NetMQHeartBeatClient(context,address);
                    }
                }
            }
            return instance;
        }

        public void InitialiseComms()
        {
            if (Interlocked.CompareExchange(ref requiresInitialisation, 0, 1) == 1)
            {
                if (actor != null)
                {
                    this.actor.Dispose();
                }

                subject = new Subject<ConnectionInfo>();
                this.actor = new Actor<object>(context, new ShimHandler(context, subject, address), this);
            }
        }

        public IObservable<ConnectionInfo> GetConnectionStatusStream()
        {
            return subject.AsObservable();
        }

        public static NetMQHeartBeatClient Instance
        {
            get { return instance; }
        }
    }
}

 

HeartBeatClient

It is the HeartBeatClient that is exposed throughout the app, where it simply wraps another stream from the NetMQHeartBeatClient, which may be used to infer the connectivity status of the comms beteween the client (as a whole) and the server. The important part being that when an error occurs and the Repeat happens, that the HeartBeatClient is recreated. Which will ensure that the NetMQHeartBeatClient will attempt to communicate with the server again. As before it all comes down to good housekeeping and lifestyle management.

 

Here is the code for the HeartBeatClient

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms;
using Client.Comms.Transport;
using NetMQ;

namespace Client.Comms
{
    public class HeartBeatClient : IHeartBeatClient
    {
        public IObservable<ConnectionInfo> ConnectionStatusStream()
        {
            return Observable.Create<ConnectionInfo>(observer =>
            {
                NetMQHeartBeatClient.Instance.InitialiseComms();

                var disposable = NetMQHeartBeatClient.Instance
                    .GetConnectionStatusStream().Subscribe(observer);

                return new CompositeDisposable { disposable };
            })
            .Publish()
            .RefCount();
        }
    }
}

 

TickerRepository

The TickerRepository is the next rung up the Observable chain. So what does that look like. Well it is actually suprisingly simple, but don't be fooled by that, there is a LOT going on here.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using Client.Factory;
using Client.Comms;

namespace Client.Repositories
{
    class TickerRepository : ITickerRepository
    {
        private readonly ITickerClient tickerClient;
        private readonly ITickerFactory tickerFactory;

        public TickerRepository(ITickerClient tickerClient, ITickerFactory tickerFactory)
        {
            this.tickerClient = tickerClient;
            this.tickerFactory = tickerFactory;
        }

        public IObservable<Ticker> GetTickerStream()
        {
            return Observable.Defer(() => tickerClient.GetTickerStream())
                .Select(tickerFactory.Create)                
                .Catch<Ticker>(Observable.Empty<Ticker>())
                .Repeat()
                .Publish()
                .RefCount();
        }
    }
}

So what is going on here exactly?

  • We use Observable.Defer such that we do not actually make use of the underlying stream, until someone subscribes to the IObservable created by using Observable.Defer. It is a way of making  a hot stream cold.
  • We use select to transform the strem data from TickerDto to Ticker
  • We use Catch to catch any Exceptions (OnError) in the stream. Where we use a default value for that case
  • We use Repeat. Now this one is VERY VERY important. This is the one that allows us to repeat the whole stream, including connecting to the server side hub again. It is this mechanism that allows the client to recover from a server side loss of SignalR hub. This along with the resilient stream logic are the MOST important bits to the app (at least in my opinion)
  • We use Publish to share the underlying stream
  • We use RefCount to get automatic disposal when there are no longer an subscribers

So now that we have seen this repository, we only have one more hop to continue the IObservable journey for the TickerStream. Let see that next.

 

TickersViewModel

The TickersViewModel represents ALL the Ticker(s) you see on the screen. It is this viewmodel that makes use of the lazy / repeatable  / resilient IObservable that the TickerRepository provides. Lets see the code it is pretty self explanatory, I think:

using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Client.Factory;
using Client.Repositories;
using Client.Services;
using Common;
using Common.ViewModels;
using log4net;


namespace Client.ViewModels
{
    public class TickersViewModel : INPCBase
    {
        private readonly ITickerRepository tickerRepository;
        private readonly IConcurrencyService concurrencyService;
        private bool stale = false;
        private static readonly ILog log = LogManager.GetLogger(typeof(TickersViewModel));

        public TickersViewModel(IReactiveTrader reactiveTrader,
                                IConcurrencyService concurrencyService,
            TickerViewModelFactory tickerViewModelFactory)
        {
            Tickers = new ObservableCollection<TickerViewModel>();
            Tickers.Add(tickerViewModelFactory.Create("Yahoo"));
            Tickers.Add(tickerViewModelFactory.Create("Google"));
            Tickers.Add(tickerViewModelFactory.Create("Apple"));
            Tickers.Add(tickerViewModelFactory.Create("Facebook"));
            Tickers.Add(tickerViewModelFactory.Create("Microsoft"));
            Tickers.Add(tickerViewModelFactory.Create("Twitter"));
            this.tickerRepository = reactiveTrader.TickerRepository;
            this.concurrencyService = concurrencyService;
            LoadTrades();
        }

        public ObservableCollection<TickerViewModel> Tickers { get; private set; }

        private void LoadTrades()
        {
            tickerRepository.GetTickerStream()
                            .ObserveOn(concurrencyService.Dispatcher)
                            .SubscribeOn(concurrencyService.TaskPool)
                            .Subscribe(
                                AddTicker,
                                ex => log.Error("An error occurred within the trade stream", ex));
        }

        private void AddTicker(Ticker ticker)
        {
            Tickers.Single(x => x.Name == ticker.Name)
                 .AcceptNewPrice(ticker.Price);
        }
    }
}

Where each Ticker is represented by a single TickerViewModel which is as shown below. It can be seen that this class also makes use of the ConnectionStatusStream IObservable we discussed earlier. This is used to change the TickerViewModels view to show a red box with "DISCONNECTED" in it. We will talk through that in just a minute.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Client.Services;
using log4net;

namespace Client.ViewModels
{
    public class TickerViewModel : INPCBase
    {
        private decimal price;
        private bool isUp;
        private bool stale;
        private bool disconnected;
        private static readonly ILog log = LogManager.GetLogger(typeof(TickerViewModel));


        public TickerViewModel(
            IReactiveTrader reactiveTrader,
            IConcurrencyService concurrencyService,
            string name)
        {

            this.Name = name;

            reactiveTrader.ConnectionStatusStream
                .ObserveOn(concurrencyService.Dispatcher)
                .SubscribeOn(concurrencyService.TaskPool)
                .Subscribe(
                    OnStatusChange,
                    ex => log.Error("An error occurred within the connection status stream.", ex));
        }


        public string Name { get; private set; }


        public void AcceptNewPrice(decimal newPrice)
        {
            IsUp = newPrice > price;
            Price = newPrice;
        }


        public decimal Price
        {
            get { return this.price; }
            private set
            {
                this.price = value;
                base.OnPropertyChanged("Price");
            }
        }

        public bool IsUp
        {
            get { return this.isUp; }
            private set
            {
                this.isUp = value;
                base.OnPropertyChanged("IsUp");
            }
        }

        public bool Stale
        {
            get { return this.stale; }
            set
            {
                this.stale = value;
                base.OnPropertyChanged("Stale");
            }
        } 
        
        public bool Disconnected
        {
            get { return this.disconnected; }
            set
            {
                this.disconnected = value;
                base.OnPropertyChanged("Disconnected");
            }
        }


        private void OnStatusChange(ConnectionInfo connectionInfo)
        {

            switch (connectionInfo.ConnectionStatus)
            {
                case ConnectionStatus.Connecting:
                    Disconnected = true;
                    break;
                case ConnectionStatus.Connected:
                    Disconnected = false;
                    break;
                case ConnectionStatus.Closed:
                    Disconnected = true;
                    break;
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }

    }
}

It can be seen that this ViewModel makes use of the IReactiveTrader.ConnectionStatusStream to monitor the status of the connection to the NetMQPublisher. It is this code that is responsible for changing the appearance of the tile from one that shows ticking prices to a big red "DISCONNECTED" tile. Which is done by the use of the Disconnected property

ConnectivityStatusViewModel

The last thing I wanted to show you was how the ConnectionStatusStream was used. This stream OnNexts when the client side HeartBeatClient pushes out new values. So we see thing like Connecting, Connected,Closed. All of which are created using the logic we looked at earlier within the NetMQHeartBeatClient, and are turned in to an IObservable stream using a standard RX Subject<T>

Anyway here is the overall ConnectivityStatusViewModel that we use to show the information in the bottom status bar of the app.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Client.Services;
using log4net;

namespace Client.ViewModels
{
    public class ConnectivityStatusViewModel : INPCBase
    {
        private static readonly ILog log = LogManager.GetLogger(typeof(ConnectivityStatusViewModel));
        private string server;
        private string status;
        private bool disconnected;

        public ConnectivityStatusViewModel(
            IReactiveTrader reactiveTrader,
            IConcurrencyService concurrencyService)
        {
            reactiveTrader.ConnectionStatusStream
                .ObserveOn(concurrencyService.Dispatcher)
                .SubscribeOn(concurrencyService.TaskPool)
                .Subscribe(
                OnStatusChange,
                ex => log.Error("An error occurred within the connection status stream.", ex));
        }


        private void OnStatusChange(ConnectionInfo connectionInfo)
        {
            Server = connectionInfo.Server;

            switch (connectionInfo.ConnectionStatus)
            {
                case ConnectionStatus.Connecting:
                    Status = "Connecting...";
                    Disconnected = true;
                    break;
                case ConnectionStatus.Connected:
                    Status = "Connected";
                    Disconnected = false;
                    break;
                case ConnectionStatus.Closed:
                    Status = "Disconnected";
                    Disconnected = true;
                    break;
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }

        public string Server
        {
            get { return this.server; }
            set
            {
                this.server = value;
                base.OnPropertyChanged("Server");
            }
        }

        public string Status
        {
            get { return this.status; }
            set
            {
                this.status = value;
                base.OnPropertyChanged("Status");
            }
        }

        public bool Disconnected
        {
            get { return this.disconnected; }
            set
            {
                this.disconnected = value;
                base.OnPropertyChanged("Disconnected");
            }
        }
    }
}

 

 

That's It For Now

Anyway that is all I wanted to say for now, I hope you have enjoyed this very mini RX/SignalR/NetMQ series, and that this has maybe made you want to go away and have a play with RX/SignalR/NetMQ. If you have enjoyed it, please feel to free to leave a vote or a comment, Doron and I have worked quite hard on these demos, to try and iron out any bugs and make them as real world as possible, so comments/votes are always nice to receive.

 

 

 

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here