Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / desktop / WPF

Dynamic Aggregated Order Books with Rx

4.82/5 (5 votes)
7 Jun 2015CPOL7 min read 15.8K   316  
Using Reactive Extensions to build and consume aggregated order books

Visit the Bitbucket repository to browse through the accompanying code. Feel free to fork and make pull requests...

Introduction

For a given asset, an aggregated order book combines data from multiple exchanges and presents two sorted tables called Bids and Asks; the Bids contain the orders to buy and the Asks contain the orders to sell. In the eyes of somebody looking at the order book, the top row in the Bids represents the highest price at which they could sell and the top row in the asks represents the cheapest price at which they could buy.

At a given time and for a given asset, an aggregated order book might look something like this:

Image 1

In this example, a market participant wishing to purchase 10 units of this asset would see that, for a volume of 10, the cheapest prices at which participants are willing to sell could be found on exchanges B and A at prices of 101 and 101.5 respectively. Other things being equal, the optimal way to split the order would be to purchase 5 units on exchange B and 5 units on exchange A for a total of 1012.5. Sticking to a single venue would have cost 1030 (5*101+ 5*105) at B or 1027.5 (101.5*5 +104*5) at A. Splitting the order across multiple venues helped save on the overall execution price.

Having established that aggregated order books are not completely devoid of interest, how to we go about building them in real time? It will be necessary to fetch data from multiple venues concurrently and rearrange the aggregated order book each time an update arrives. If approached with an Object Oriented mindset, this exercise is not at all trivial. However, adopting a more functional approach and relying on the Reactive Extensions for .NET (Rx), proves to be a rewarding path. For those not familiar with Rx, the goto source of information is www.introtorx.com.

Streams of Immutable Objects

The problem can be viewed as assembling streams of immutable objects being pushed down to consumers. Each exchange exposes a stream of order books containing orders from that exchange only. We merge these sources into a single observable sequence and process items sequentially. Each time a new order book comes in, we pass it through an accumulator function which incorporates it with the latest version of the aggregated order book and returns the intermediary value. The result of running the merged stream through the accumulator function (a process called scanning) is an observable sequence of aggregated order books. This sequence can be processed to perform such actions as updating a dynamic display, looking for arbitrage opportunities or just displaying to the console.

Image 2

Skipping straight to what might have been the conclusion to this article, here is what the above diagram translates to in code having defined the appropriate abstractions:

C#
IExchangeClient exchangeA = new MockClient(0);
IExchangeClient exchangeB = new MockClient(1);
IExchangeClient exchangeC = new MockClient(2);

var mergedOrderBookStream = Observable.Merge(exchangeA.OrderBookStream,
                                             exchangeB.OrderBookStream,
                                             exchangeC.OrderBookStream);

var aggregatedOrderBookStream = mergedOrderBookStream.Scan(
                                  new AggregatedOrderBook(),
                                  (aob, orderBook) => aob.InsertBook(orderBook));

var consoleSubscription = aggregatedOrderBookStream.Subscribe(Console.WriteLine);

IExchangeClient

The code that communicates with individual exchanges is abstracted behind the IExchangeClient interface.

C#
public interface IExchangeClient
{
    int ExchangeID
    {
        get;
    }

    IObservable<ExchangeOrder> OrderStream
    {
        get;
    }

    IObservable<IOrderBook> OrderBookStream
    {
        get;
    }

    Task<ExchangeOrder> SubmitOrder(OrderSide oSide, OrderType oType, decimal price, decimal size);

    Task<ExchangeOrder> CancelOrder(ExchangeOrder order);

    void Connect();

    void Disconnect();
}

Each implementation, containing specific logic to convey information to and from the underlying venue, be it through an HTTP Web API, WebSockets or the FIX protocol, exposes the same properties and methods known to the rest of the application. Queryable data, such as orders and order books are presented in the form of streams which push data down to the consumer. In Rx, such objects are defined by the IObservable interface.

As an example, let us assume that an exchange exposes an http method to get the latest order book and that we have written a HttpGetOrderBook() method to perform the request. We will want to call this method every 5 seconds, wait for a reply and push the result down the stream to our consumers. If a request takes more than 10 seconds, we want it to timeout but continue retrying. The sequence of order books exposed must be the same for every consumer. Here is how we would go about doing this with Rx:

C#
var _orderBookStream = Observable.Create<IOrderBook>(obs =>
           {
               obs.OnNext(HttpGetOrderBook());
               return Disposable.Empty;
           })
           .Concat(Observable.Empty<IOrderBook>().Delay(5))
           .Repeat()
           .Timeout(TimeSpan.FromSeconds(10))
           .Retry()
           .Publish();

This will call HttpGetOrderBook(), push the values to the subscriber, wait for the result sequence to complete, and then concatenate the empty sequence that is delayed by 5 seconds. After the delay, the process is repeated, sending off the request again, waiting for the response and then holding off for another 5 seconds before repeating again. We introduce a timeout if any request takes too long and just retry if a timeout occurs. The Publish keyword guarantees that all subscribers consume the same stream by converting the sequence into an IConnectableObservable. Once we call Connect() on this sequence, data will start being pushed down and all subscribers will perceive the same stream of orderbooks. To disconnect, we simply dispose the object returned by Connect(). The IExchangeClient interface exposes two methods (Connect and Disconnect) allowing to start and stop receiving data from individual exchanges.

In the first snippet, I used three MockClients. This is a type of IExchangeClient which simulates the activity of an exchange and can be used to perform tests. It was actually the hardest thing to build in this application and probably deserves a separate article as there is some pretty cool Rx code in there too.

IOrderBook, OrderBook and OrderComparer

OrderBooks are simply wrappers around two sorted sets of Bids and Asks.

C#
public interface IOrderBook
{
    int ExchangeID
    {
        get;
    }

    ImmutableSortedSet<ExchangeOrder> Bids
    {
        get;
    }

    ImmutableSortedSet<ExchangeOrder> Asks
    {
        get;
    }
}

The sorted sets are defined with the type ImmutableSortedSet, a .NET class which handles all the sorting logic based on a comparer that can be provided explicitly.

C#
public class OrderBook : IOrderBook
{
    public OrderBook(int exchnangeID, IEnumerable<ExchangeOrder> bids, IEnumerable<ExchangeOrder> asks)
    {
        ExchangeID = exchnangeID;
        Bids = bids.ToImmutableSortedSet(comparer: OrderComparer.DescBidComparer());
        Asks = asks.ToImmutableSortedSet(comparer: OrderComparer.DescAskComparer());
    }

    public int ExchangeID { get; private set; }
    public ImmutableSortedSet<ExchangeOrder> Bids { get; private set; }
    public ImmutableSortedSet<ExchangeOrder> Asks { get; private set; }
}

In this case, we have implemented an order comparer which takes in two orders, and returns -1, 0 or 1 if the first order is respectively better, equivalent or worse than the second.

C#
public class OrderComparer : Comparer<ExchangeOrder>
{
    //-1 for Bid comparer
    //1 for Ask comparer
    readonly int _priceComparisonCoeff;

    OrderComparer(int priceComparisonCoeff)
    {
        _priceComparisonCoeff = priceComparisonCoeff;
    }

    //!! counterintuitive but we need SortedSets to be in descending order. Instead of calling reverse     
    //all the time, we implement the behavior in the comparer
    //Returns: -1 if x is better than y
    //          0 if x is equivalent to y
    //          1 if x is worse than y
    //
    //         bids and asks:> market order always wins over limit order
    //            if both market orders and same timestamp, most remaining volume wins         
    //        
    //         bids:> most expensive wins  
    //         asks:> cheapest wins,
    //            if same price, oldest wins,
    //                if same price and timestamp, most remaining volume wins
    //
    //ASSUMPTION: x and y are on the same side (BID or ASK).
    //We do not look at ID or ExchangeID
    //The order of comparisons is: price, timestamp, remainingvolume (note that size does not matter)
    public override int Compare(ExchangeOrder x, ExchangeOrder y)
    {

        //one market order and one limit order
        if (x.OType == OrderType.Market && y.OType == OrderType.Limit)
        {
            return -1;
        }
        if (x.OType == OrderType.Limit && y.OType == OrderType.Market)
        {
            return 1;
        }

        //two limit orders
        if (x.Price.CompareTo(y.Price) != 0)
        {
            return _priceComparisonCoeff * x.Price.CompareTo(y.Price);
        }
        if (x.UTCTimestamp.CompareTo(y.UTCTimestamp) != 0)
        {
            return x.UTCTimestamp.CompareTo(y.UTCTimestamp);
        }

        //two market orders or two limit orders with the same prices
        if (x.RemainingVolume.CompareTo(y.RemainingVolume) != 0)
        {
            return -x.RemainingVolume.CompareTo(y.RemainingVolume);
        }

        //they have the same characteristics. not necessary same ID
        //not good because we are not supposed to have two equivalent orders in the orderbook
        return 0;
    }

    public static OrderComparer DescBidComparer()
    {
         return new OrderComparer(-1);
    }

    public static OrderComparer DescAskComparer()
    {
        return new OrderComparer(1);
    }
}

AggregatedOrderBook

The aggregated order book looks just like its standard counterpart but combines orders from multiple venues. It contains two ImmutableSortedSets instantiated with the appropriate comparers for Bids and Asks as well as the accumulator function which rearranges orders among them each time an update is received in the form of a new order book. The InsertBook method, which takes a regular order book as parameter (one with orders belonging to a single exchange) returns a new copy of the aggregated order book where all the orders from the exchange affected by the updated are replaced by the incoming ones. The combination of ImmutableSortedSet and OrderComparer work together to guarantee that the Bids and Asks remain arranged in the appropriate order.

C#
public AggregatedOrderBook InsertBook(IOrderBook orderBook)
{
   int exchangeID = orderBook.ExchangeID;
   var modifiedAggregatedBids = Bids;
   var modifiedAggregatedAsks = Asks;

   var correspondingBids = Bids.Where(a => a.ExchangeID == exchangeID);
   modifiedAggregatedBids = Bids.Except(correspondingBids).Union(orderBook.Bids);

   var correspondingAsks = Asks.Where(a => a.ExchangeID == exchangeID);
   modifiedAggregatedAsks = Asks.Except(correspondingAsks).Union(orderBook.Asks);

   return new AggregatedOrderBook(modifiedAggregatedBids, modifiedAggregatedAsks);
}

Using the Code

Having set up a stream of aggregated order books by merging multiple sources, we can start consuming this data. In Rx terms, we can Subscribe to the observable sequence. Subscribing to a sequence means setting up an action to be performed each time a new item is pushed through.

WPF Application

The first thing we might want to do is to update a dynamic display. The WPF application allows visualizing the data flow in motion. In the left column are the controls for the three individual exchanges whose order books we want to combine. Here, I have used 3 MockClients again so the orders are generated randomly. There is a Start and Stop button for each venue which simply control the flow of orders being pushed down the pipeline.

Image 3

The action that we want performed for each new incoming order book is to replace the collection of Bids and Asks which are defined as simple Lists in the main view model. The WPF binding mechanism and INotifyPropertyChanged take care of updating the UI. Here, it is important to note that we need to subscribe on a background thread and observe on the UI thread. Rx makes that very easy with the SubscribeOn and ObserveOnDispatcher methods.

C#
//subscribe to the aggregatedOrderBookStream. 
//each time a new aggregated orderbook is received, the list of bids and asks is updated in the UI.
//ATTENTION: no elements will be observed until at least one exchange is connected to
aggregateStreamSubscription = aggregatedOrderBookStream.SubscribeOn(NewThreadScheduler.Default)
                                                .ObserveOnDispatcher()
                                                .Subscribe((aob) =>
                                                {
                                                    Bids = aob.Bids.ToList();
                                                    Asks = aob.Asks.ToList();
                                                });

Identifying Arbitrage Opportunities

After running the WPF app for a while, one might notice that sometimes the best Bid is more expensive than the best Ask:

Image 4

This introduces the opportunity to buy low on one exchange and sell high on another. This type of free lunch is called an Arbitrage.

In our framework, we shall represent an arbitrage opportunity as a wrapper around two dictionaries containing the amounts to buy and sell per exchange:

C#
public class ArbitrageOpportunity
{
    public ArbitrageOpportunity(Dictionary<int, decimal> buyDico, Dictionary<int, decimal> sellDico)
    {
        BuyDico = buyDico;
        SellDico = sellDico;
    }

    /// <summary>
    /// dictionary of [exchangeID, buyVolume] representing the amounts that should be bought on each exchange
    /// </summary>
    public Dictionary<int, decimal> BuyDico { get; private set; }

    /// <summary>
    /// dictionary of [exchangeID, sellVolume] representing the amounts that should be sold on each exchange
    /// </summary>
    public Dictionary<int, decimal> SellDico { get; private set; }
}

Then, we can extend the AggregatedOrderBook class with a method to identify arbitrage opportunities and project the observable sequence of aggregated order books onto a sequence of ArbitrageOpportunities with a simple Rx Select statement:

C#
//project the aggregated orderbook stream onto a stream of arbitrages
var arbitrageStream = aggregatedOrderBookStream.Select(aob => aob.LookForArbitrage()); 

Finally, we could subscribe to the arbitrage sequence and dispatch the appropriate orders to take advantage of the opportunity:

C#
//pass each non-null arbitrage through the orderDispatcher which handles concurrency and race conditions internally
arbitrageStreamSubscription = arbitrageStream.Where(a => a != null).Subscribe(a => orderDispatcher.Put(a));

The last snippet is just to give an idea of what might be done but the corresponding code is not provided because it goes out of the scope of this article.

Conclusion

The reactive programming model helped solve a complicated concurrency problem with code that is both testable and readable. The abstractions introduced in this article make it possible to plug in any kind of exchange API and compose an aggregated order book for a real market.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)