Visit the Bitbucket repository to browse through the accompanying code. Feel free to fork and make pull requests...
Introduction
For a given asset, an aggregated order book combines data from multiple exchanges and presents two sorted tables called Bids
and Asks
; the Bids
contain the orders to buy and the Asks
contain the orders to sell. In the eyes of somebody looking at the order book, the top row in the Bids
represents the highest price at which they could sell and the top row in the asks
represents the cheapest price at which they could buy.
At a given time and for a given asset, an aggregated order book might look something like this:
In this example, a market participant wishing to purchase 10 units of this asset would see that, for a volume of 10, the cheapest prices at which participants are willing to sell could be found on exchanges B and A at prices of 101 and 101.5 respectively. Other things being equal, the optimal way to split the order would be to purchase 5 units on exchange B and 5 units on exchange A for a total of 1012.5. Sticking to a single venue would have cost 1030 (5*101+ 5*105) at B or 1027.5 (101.5*5 +104*5) at A. Splitting the order across multiple venues helped save on the overall execution price.
Having established that aggregated order books are not completely devoid of interest, how to we go about building them in real time? It will be necessary to fetch data from multiple venues concurrently and rearrange the aggregated order book each time an update arrives. If approached with an Object Oriented mindset, this exercise is not at all trivial. However, adopting a more functional approach and relying on the Reactive Extensions for .NET (Rx), proves to be a rewarding path. For those not familiar with Rx, the goto source of information is www.introtorx.com.
Streams of Immutable Objects
The problem can be viewed as assembling streams of immutable objects being pushed down to consumers. Each exchange exposes a stream of order books containing orders from that exchange only. We merge these sources into a single observable sequence and process items sequentially. Each time a new order book comes in, we pass it through an accumulator function which incorporates it with the latest version of the aggregated order book and returns the intermediary value. The result of running the merged stream through the accumulator function (a process called scanning) is an observable sequence of aggregated order books. This sequence can be processed to perform such actions as updating a dynamic display, looking for arbitrage opportunities or just displaying to the console.
Skipping straight to what might have been the conclusion to this article, here is what the above diagram translates to in code having defined the appropriate abstractions:
IExchangeClient exchangeA = new MockClient(0);
IExchangeClient exchangeB = new MockClient(1);
IExchangeClient exchangeC = new MockClient(2);
var mergedOrderBookStream = Observable.Merge(exchangeA.OrderBookStream,
exchangeB.OrderBookStream,
exchangeC.OrderBookStream);
var aggregatedOrderBookStream = mergedOrderBookStream.Scan(
new AggregatedOrderBook(),
(aob, orderBook) => aob.InsertBook(orderBook));
var consoleSubscription = aggregatedOrderBookStream.Subscribe(Console.WriteLine);
IExchangeClient
The code that communicates with individual exchanges is abstracted behind the IExchangeClient
interface.
public interface IExchangeClient
{
int ExchangeID
{
get;
}
IObservable<ExchangeOrder> OrderStream
{
get;
}
IObservable<IOrderBook> OrderBookStream
{
get;
}
Task<ExchangeOrder> SubmitOrder(OrderSide oSide, OrderType oType, decimal price, decimal size);
Task<ExchangeOrder> CancelOrder(ExchangeOrder order);
void Connect();
void Disconnect();
}
Each implementation, containing specific logic to convey information to and from the underlying venue, be it through an HTTP Web API, WebSockets or the FIX protocol, exposes the same properties and methods known to the rest of the application. Queryable data, such as orders and order books are presented in the form of streams which push data down to the consumer. In Rx, such objects are defined by the IObservable
interface.
As an example, let us assume that an exchange exposes an http method to get the latest order book and that we have written a HttpGetOrderBook()
method to perform the request. We will want to call this method every 5 seconds, wait for a reply and push the result down the stream to our consumers. If a request takes more than 10 seconds, we want it to timeout but continue retrying. The sequence of order books exposed must be the same for every consumer. Here is how we would go about doing this with Rx:
var _orderBookStream = Observable.Create<IOrderBook>(obs =>
{
obs.OnNext(HttpGetOrderBook());
return Disposable.Empty;
})
.Concat(Observable.Empty<IOrderBook>().Delay(5))
.Repeat()
.Timeout(TimeSpan.FromSeconds(10))
.Retry()
.Publish();
This will call HttpGetOrderBook()
, push the values to the subscriber, wait for the result sequence to complete, and then concatenate the empty sequence that is delayed by 5 seconds. After the delay, the process is repeated, sending off the request again, waiting for the response and then holding off for another 5 seconds before repeating again. We introduce a timeout if any request takes too long and just retry if a timeout occurs. The Publish
keyword guarantees that all subscribers consume the same stream by converting the sequence into an IConnectableObservable
. Once we call Connect()
on this sequence, data will start being pushed down and all subscribers will perceive the same stream of orderbooks
. To disconnect, we simply dispose the object returned by Connect()
. The IExchangeClient
interface exposes two methods (Connect
and Disconnect
) allowing to start and stop receiving data from individual exchanges.
In the first snippet, I used three MockClients
. This is a type of IExchangeClient
which simulates the activity of an exchange and can be used to perform tests. It was actually the hardest thing to build in this application and probably deserves a separate article as there is some pretty cool Rx code in there too.
IOrderBook, OrderBook and OrderComparer
OrderBooks
are simply wrappers around two sorted sets of Bids
and Asks
.
public interface IOrderBook
{
int ExchangeID
{
get;
}
ImmutableSortedSet<ExchangeOrder> Bids
{
get;
}
ImmutableSortedSet<ExchangeOrder> Asks
{
get;
}
}
The sorted sets are defined with the type ImmutableSortedSet
, a .NET class which handles all the sorting logic based on a comparer that can be provided explicitly.
public class OrderBook : IOrderBook
{
public OrderBook(int exchnangeID, IEnumerable<ExchangeOrder> bids, IEnumerable<ExchangeOrder> asks)
{
ExchangeID = exchnangeID;
Bids = bids.ToImmutableSortedSet(comparer: OrderComparer.DescBidComparer());
Asks = asks.ToImmutableSortedSet(comparer: OrderComparer.DescAskComparer());
}
public int ExchangeID { get; private set; }
public ImmutableSortedSet<ExchangeOrder> Bids { get; private set; }
public ImmutableSortedSet<ExchangeOrder> Asks { get; private set; }
}
In this case, we have implemented an order comparer which takes in two orders, and returns -1, 0 or 1 if the first order is respectively better, equivalent or worse than the second.
public class OrderComparer : Comparer<ExchangeOrder>
{
readonly int _priceComparisonCoeff;
OrderComparer(int priceComparisonCoeff)
{
_priceComparisonCoeff = priceComparisonCoeff;
}
public override int Compare(ExchangeOrder x, ExchangeOrder y)
{
if (x.OType == OrderType.Market && y.OType == OrderType.Limit)
{
return -1;
}
if (x.OType == OrderType.Limit && y.OType == OrderType.Market)
{
return 1;
}
if (x.Price.CompareTo(y.Price) != 0)
{
return _priceComparisonCoeff * x.Price.CompareTo(y.Price);
}
if (x.UTCTimestamp.CompareTo(y.UTCTimestamp) != 0)
{
return x.UTCTimestamp.CompareTo(y.UTCTimestamp);
}
if (x.RemainingVolume.CompareTo(y.RemainingVolume) != 0)
{
return -x.RemainingVolume.CompareTo(y.RemainingVolume);
}
return 0;
}
public static OrderComparer DescBidComparer()
{
return new OrderComparer(-1);
}
public static OrderComparer DescAskComparer()
{
return new OrderComparer(1);
}
}
AggregatedOrderBook
The aggregated order book looks just like its standard counterpart but combines orders from multiple venues. It contains two ImmutableSortedSets
instantiated with the appropriate comparers for Bids
and Asks
as well as the accumulator function which rearranges orders among them each time an update is received in the form of a new order book. The InsertBook
method, which takes a regular order book as parameter (one with orders belonging to a single exchange) returns a new copy of the aggregated order book where all the orders from the exchange affected by the updated are replaced by the incoming ones. The combination of ImmutableSortedSet
and OrderComparer
work together to guarantee that the Bids
and Asks
remain arranged in the appropriate order.
public AggregatedOrderBook InsertBook(IOrderBook orderBook)
{
int exchangeID = orderBook.ExchangeID;
var modifiedAggregatedBids = Bids;
var modifiedAggregatedAsks = Asks;
var correspondingBids = Bids.Where(a => a.ExchangeID == exchangeID);
modifiedAggregatedBids = Bids.Except(correspondingBids).Union(orderBook.Bids);
var correspondingAsks = Asks.Where(a => a.ExchangeID == exchangeID);
modifiedAggregatedAsks = Asks.Except(correspondingAsks).Union(orderBook.Asks);
return new AggregatedOrderBook(modifiedAggregatedBids, modifiedAggregatedAsks);
}
Using the Code
Having set up a stream of aggregated order books by merging multiple sources, we can start consuming this data. In Rx terms, we can Subscribe to the observable sequence. Subscribing to a sequence means setting up an action to be performed each time a new item is pushed through.
WPF Application
The first thing we might want to do is to update a dynamic display. The WPF application allows visualizing the data flow in motion. In the left column are the controls for the three individual exchanges whose order books we want to combine. Here, I have used 3 MockClients
again so the orders are generated randomly. There is a Start
and Stop
button for each venue which simply control the flow of orders being pushed down the pipeline.
The action that we want performed for each new incoming order book is to replace the collection of Bids
and Asks
which are defined as simple Lists
in the main view model. The WPF binding mechanism and INotifyPropertyChanged
take care of updating the UI. Here, it is important to note that we need to subscribe on a background thread and observe on the UI thread. Rx makes that very easy with the SubscribeOn
and ObserveOnDispatcher
methods.
aggregateStreamSubscription = aggregatedOrderBookStream.SubscribeOn(NewThreadScheduler.Default)
.ObserveOnDispatcher()
.Subscribe((aob) =>
{
Bids = aob.Bids.ToList();
Asks = aob.Asks.ToList();
});
Identifying Arbitrage Opportunities
After running the WPF app for a while, one might notice that sometimes the best Bid is more expensive than the best Ask:
This introduces the opportunity to buy low on one exchange and sell high on another. This type of free lunch is called an Arbitrage.
In our framework, we shall represent an arbitrage opportunity as a wrapper around two dictionaries containing the amounts to buy and sell per exchange:
public class ArbitrageOpportunity
{
public ArbitrageOpportunity(Dictionary<int, decimal> buyDico, Dictionary<int, decimal> sellDico)
{
BuyDico = buyDico;
SellDico = sellDico;
}
public Dictionary<int, decimal> BuyDico { get; private set; }
public Dictionary<int, decimal> SellDico { get; private set; }
}
Then, we can extend the AggregatedOrderBook
class with a method to identify arbitrage opportunities and project the observable sequence of aggregated order books onto a sequence of ArbitrageOpportunities
with a simple Rx Select
statement:
var arbitrageStream = aggregatedOrderBookStream.Select(aob => aob.LookForArbitrage());
Finally, we could subscribe to the arbitrage sequence and dispatch the appropriate orders to take advantage of the opportunity:
arbitrageStreamSubscription = arbitrageStream.Where(a => a != null).Subscribe(a => orderDispatcher.Put(a));
The last snippet is just to give an idea of what might be done but the corresponding code is not provided because it goes out of the scope of this article.
Conclusion
The reactive programming model helped solve a complicated concurrency problem with code that is both testable and readable. The abstractions introduced in this article make it possible to plug in any kind of exchange API and compose an aggregated order book for a real market.