Introduction
If you've not heard about Complex Event Processing (CEP), what it is and why you might want to use it then wikipedia is a good start. Or perhaps better the introduction to Microsoft's StreamInsight.
There are quite a few free and paid for solutions out there. For dotNetters the main 2 seem to be:
StreamInsight - a joy to program but requires a SQL license
NEsper - slightly arcane at times, but a free port of Esper and slightly more versatile.
Assume we don't want to learn a new API and are already comfortable with Rx plus we're not inclined to pay extra for a licensing. Can we do it ourselves cheaply leveraging Rx?
Background
Rather than just present some code that illustrates the theory, I wanted to provide a practical example. So I set myself the goal:
1) Create something that queries a free online source for stock tick data.
2) Periodically retrieve stock tick data and push it to subscribers.
3) Separate those updates to create streams or windows for that real time data.
4) Perform time series calculations on those windows.
I've used a combination of TPL and a nuget package called Yahoo! Managed, to address 1 & 2. Whilst Y! delays updates, we can poll it continuously in order to push updates into our system. Unfortunately Google Finance no longer provides a free API so we can't use that.
For 3 and 4 I've adapted the examples from MSDN, as well as taken inspiration from Intro To Rx, 101 Rx samples and the numerous other articles on Rx here at codeproject.
Using the code
The attached code was developed using VSExpress 2012 for Windows Desktop and so should build and run from <F5>. For other versions your usage may vary, but all being well you should see something like this:
Whats happening here, is that first the program creates a QuoteStream to encapsulate all the component price quotes of the NASDAQ-100 i.e. prices for Apple, Microsoft, Intel etc. Second, because we are only interested in Apple we create a QuoteView that subscribes to updates from the QuoteStream, using a filter.
appleView.SubscribeTo(allQuotes.Where(x => x.ID == "AAPL"));
Third we create a QuoteSlidingWindow which will keep hold of historical data for window defined by a TimeSpan object and the a QuotesData.LastTradeTime property. Again the sliding window subscribes to updates published by the View and itself is observable such that we can subscribe and perform calculations using Rx such as a 5 minute moving price average:
static void CalculateAverage(IList<QuotesData> quotes)
{
double average = quotes.Average(x => x.LastTradePriceOnly);
}
Points of Interest
I use the word Stream to be roughly analogous to the term used by StreamInsight and implements IObservable<T>. Here a Stream object can hold data that is updated from an external source and can push updates to observers.
A View is any filtered derivative of Stream but implements IObserver<T> and IObservable<T> such that updates can be piped and filtered through intermediate containers.
SlidingWindow a discrete window on a portion of a data sequence that moves forward incrementally with each update. Similar to a SQL partition\window and closely resembling sliding windows in other CEP implementations. When a value moves beyond the time period extending back from the next update the window publishes its results and then removes that value.
To implement this in C# we have to do two things. First override the OnNext function of the IObserver<T> interface such that updates pushed to it modify the underlying data and publish when appropriate:
public override void OnNext(QuotesData value)
{
DataStream.Enqueue(value);
if (DataStream.Count > 0)
{
var latestQuoteTime = DataStream.Max(x => x.LastTradeTime);
bool publish = false;
while (DataStream.Count > 0)
{
var first = DataStream.Peek();
if (latestQuoteTime - first.LastTradeTime >= WindowLength)
{
DataStream.Dequeue();
publish = true;
}
else
break;
}
if (publish)
{
foreach (var observer in Observers)
observer.OnNext(DataStream.ToList());
}
}
}
Second we have to implement IObservable<IList<T>> such that we can pipe the current window data to it's observers. I do this on the base class QuoteWindow like so:
public IDisposable Subscribe(IObserver<IList<QuotesData>> observer)
{
if (!Observers.Contains(observer))
Observers.Add(observer);
return new WindowUnsubscriber(Observers, observer);
}
TumblingWindow is similar to a sliding window except it begins when the first value of a sequence enters and ends on entry of a value beyond the window's length. For example, if the window is 5 mins and the first value is at 13:00:00 then a value at 13:05:00 or greater will complete the window publish the results and then clear any underlying data.
We can reuse much of the code we've already created for a SlidingWindow in the implementation of TumblingWindow. Infact, QuoteTumblingWindow merely overrides OnNext
public override void OnNext(QuotesData value)
{
if (DataStream.Count > 0)
{
if (value.LastTradeTime - WindowStart >= WindowLength)
{
foreach (var observer in Observers)
observer.OnNext(DataStream.ToList());
DataStream.Clear();
}
}
if (DataStream.Count == 0)
WindowStart = value.LastTradeTime;
DataStream.Enqueue(value);
}
As you can see, we can (relatively) easily implement the required interfaces and with a little customization achieve a pipe and filter pattern to an end function that will perform any time series calculation we need. We should also be able to add and remove subscriptions to each of the above entities as and when we want.
Adding Value
With the plumbing done, we can now begin to add value to the data we're receiving. Two common indicators in the world of trading are Time Weighted Average Price (TWAP) and Volume Weighted Average Price VWAP.
TWAP, is traditionally calculated by finding the mean of the Open, Close, High and Low prices for each tick within a time window and then taking the mean of those values. I've taken a simpler and possibly better approach outlined at Tradestation to calculate a TWAP value for each time window:
private static double twap(IList<QuotesData> quotes)
{
if (quotes.Count > 0)
{
DateTime max = DateTime.MinValue;
DateTime min = DateTime.MaxValue;
string ticker = quotes[0].ID;
double totalPrice = 0.0;
int n = quotes.Count;
double twap = 0.0;
foreach (var q in quotes)
{
totalPrice += q.LastTradePriceOnly;
if (q.LastTradeTime > max)
max = q.LastTradeTime;
if (q.LastTradeTime < min)
min = q.LastTradeTime;
}
if (n > 0 & totalPrice > 0.0)
{
twap = totalPrice / n;
return twap;
}
}
return double.NaN;
}
VWAP, is similar to TWAP except we weight each tick by the volume of the stock traded. Needless to say a million stocks sold at $1.00 are far more relevant than 1 stock sold at $100. Again, I calculate VWAP for each published time window:
private static double vwap(IList<QuotesData> quotes)
{
if (quotes.Count > 0)
{
DateTime max = DateTime.MinValue;
DateTime min = DateTime.MaxValue;
string ticker = quotes[0].ID;
double totalWeightedPrice = 0.0;
double totalQuantity = 0.0;
double vwap = 0.0;
foreach (var q in quotes)
{
double quantity = (double)q.Values(QuoteProperty.LastTradeSize);
totalWeightedPrice += (q.LastTradePriceOnly * quantity);
totalQuantity += quantity;
if (q.LastTradeTime > max)
max = q.LastTradeTime;
if (q.LastTradeTime < min)
min = q.LastTradeTime;
}
if (totalQuantity > 0 & totalWeightedPrice > 0.0)
{
vwap = totalWeightedPrice / totalQuantity;
return vwap;
}
}
return double.NaN;
}
To produce a running version of TWAP or VWAP over a longer period we can always pipe the results into another Window. Like wise if we are aiming to do something a bit more involved such as execute new trades based on our calculations then we would need to wrap our execution API with IObserver<T> and then pipe the relevant data it would need to construct an order. That however is a subject for another day.
Conclusions
Having worked with StreamInsight and Esper (in Java) but not Rx, this article took roughly 2 man days to research and write. In the UK IT managers often cost development projects at £600 per man day, so we can estimate that in my case the cost was less than £1500. Compare this to SQL Server Standard Edition which retails at about £2500 and then add on the cost of implementation and on the face of it we've made a reasonable cost saving.
The limitations though are that this solution is not particularly scale-able nor does it support many of the other extensive features a proper CEP solution provide.
Troubleshooting
You may find that the default implementation only retrieves one stock quote before pausing. If so then it is likely that the exchange is out of trading hours. If this is the case I've provided FTSE-100 symbols so you can easily switch over but if you want anything else then you'll have to copy+paste from Y! for now.
Possible Future Updates
More complex standard calculations such as portfolio VAR
How does this compare to the equivalent in StreamInsight or NEsper (C#) or Esper (Java)?
All and any feedback is greatly appreciated.
History
Revision #1 - The bare bones of an article and code
Revision #2 - Added TumblingWindows, calculators for TWAP and VWAP.