Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

A primitive CEP engine to analyse stock market data in real time

0.00/5 (No votes)
10 Apr 2014 1  
Can we use LinqRx as a poor-man's complex event processor?

Introduction

If you've not heard about Complex Event Processing (CEP), what it is and why you might want to use it then wikipedia is a good start. Or perhaps better the introduction to Microsoft's StreamInsight.

There are quite a few free and paid for solutions out there. For dotNetters the main 2 seem to be:

StreamInsight - a joy to program but requires a SQL license

NEsper - slightly arcane at times, but a free port of Esper and slightly more versatile.

Assume we don't want to learn a new API and are already comfortable with Rx plus we're not inclined to pay extra for a licensing. Can we do it ourselves cheaply leveraging Rx?

Background

Rather than just present some code that illustrates the theory, I wanted to provide a practical example. So I set myself the goal:

1) Create something that queries a free online source for stock tick data.

2) Periodically retrieve stock tick data and push it to subscribers.

3) Separate those updates to create streams or windows for that real time data.

4) Perform time series calculations on those windows.

I've used a combination of TPL and a nuget package called Yahoo! Managed, to address 1 & 2. Whilst Y! delays updates, we can poll it continuously in order to push updates into our system. Unfortunately Google Finance no longer provides a free API so we can't use that.

For 3 and 4 I've adapted the examples from MSDN, as well as taken inspiration from Intro To Rx, 101 Rx samples and the numerous other articles on Rx here at codeproject.

Using the code

The attached code was developed using VSExpress 2012 for Windows Desktop and so should build and run from <F5>. For other versions your usage may vary, but all being well you should see something like this:

Whats happening here, is that first the program creates a QuoteStream to encapsulate all the component price quotes of the NASDAQ-100 i.e. prices for Apple, Microsoft, Intel etc. Second, because we are only interested in Apple we create a QuoteView that subscribes to updates from the QuoteStream, using a filter.

 appleView.SubscribeTo(allQuotes.Where(x => x.ID == "AAPL"));

Third we create a QuoteSlidingWindow which will keep hold of historical data for window defined by a TimeSpan object and the a QuotesData.LastTradeTime property. Again the sliding window subscribes to updates published by the View and itself is observable such that we can subscribe and perform calculations using Rx such as a 5 minute moving price average:

        static void CalculateAverage(IList<QuotesData> quotes)
        {
            double average = quotes.Average(x => x.LastTradePriceOnly);
        }  

Points of Interest

I use the word Stream to be roughly analogous to the term used by StreamInsight and implements IObservable<T>. Here a Stream object can hold data that is updated from an external source and can push updates to observers.

A View is any filtered derivative of Stream but implements IObserver<T> and IObservable<T> such that updates can be piped and filtered through intermediate containers.

SlidingWindow a discrete window on a portion of a data sequence that moves forward incrementally with each update. Similar to a SQL partition\window and closely resembling sliding windows in other CEP implementations. When a value moves beyond the time period extending back from the next update the window publishes its results and then removes that value.

To implement this in C# we have to do two things. First override the OnNext function of the IObserver<T> interface such that updates pushed to it modify the underlying data and publish when appropriate:

  public override void OnNext(QuotesData value)
        {

            DataStream.Enqueue(value);

            if (DataStream.Count > 0)
            {
                var latestQuoteTime = DataStream.Max(x => x.LastTradeTime);
                bool publish = false;

                //find and remove quotes that are now outside of the sliding window
                while (DataStream.Count > 0)
                {
                    var first = DataStream.Peek();
                    if (latestQuoteTime - first.LastTradeTime >= WindowLength)
                    {
                        DataStream.Dequeue();
                        publish = true;
                    }
                    else
                        break;
                }

                //we publish to subscribers only if a quote is exiting the window period
                if (publish)
                {
                    foreach (var observer in Observers)
                        observer.OnNext(DataStream.ToList());
                }
            }

        } 

Second we have to implement IObservable<IList<T>> such that we can pipe the current window data to it's observers. I do this on the base class QuoteWindow like so:

        public IDisposable Subscribe(IObserver<IList<QuotesData>> observer)
        {
            if (!Observers.Contains(observer))
                Observers.Add(observer);
            return new WindowUnsubscriber(Observers, observer);
        } 

TumblingWindow is similar to a sliding window except it begins when the first value of a sequence enters and ends on entry of a value beyond the window's length. For example, if the window is 5 mins and the first value is at 13:00:00 then a value at 13:05:00 or greater will complete the window publish the results and then clear any underlying data.

We can reuse much of the code we've already created for a SlidingWindow in the implementation of TumblingWindow. Infact, QuoteTumblingWindow merely overrides OnNext

public override void OnNext(QuotesData value)
        {
            //does this new value complete the current window? If so publish and then reset.
            if (DataStream.Count > 0)
            {
                if (value.LastTradeTime - WindowStart >= WindowLength)
                {
                    foreach (var observer in Observers)
                        observer.OnNext(DataStream.ToList());
                    DataStream.Clear();
                }
            }
            //start a new window
            if (DataStream.Count == 0)
                WindowStart = value.LastTradeTime;
            DataStream.Enqueue(value);
 
        } 

As you can see, we can (relatively) easily implement the required interfaces and with a little customization achieve a pipe and filter pattern to an end function that will perform any time series calculation we need. We should also be able to add and remove subscriptions to each of the above entities as and when we want.

Adding Value

With the plumbing done, we can now begin to add value to the data we're receiving. Two common indicators in the world of trading are Time Weighted Average Price (TWAP) and Volume Weighted Average Price VWAP.

TWAP, is traditionally calculated by finding the mean of the Open, Close, High and Low prices for each tick within a time window and then taking the mean of those values. I've taken a simpler and possibly better approach outlined at Tradestation to calculate a TWAP value for each time window:

private static double twap(IList<QuotesData> quotes)
        {
            if (quotes.Count > 0)
            {
                DateTime max = DateTime.MinValue;
                DateTime min = DateTime.MaxValue;
                string ticker = quotes[0].ID;
                double totalPrice = 0.0;
                int n = quotes.Count;
                double twap = 0.0;
                foreach (var q in quotes)
                {
                    totalPrice += q.LastTradePriceOnly;
                    if (q.LastTradeTime > max)
                        max = q.LastTradeTime;
                    if (q.LastTradeTime < min)
                        min = q.LastTradeTime;
                }
                if (n > 0 & totalPrice > 0.0)
                {
                    twap = totalPrice / n;
                    return twap;
                }
            }
            return double.NaN;
        } 

VWAP, is similar to TWAP except we weight each tick by the volume of the stock traded. Needless to say a million stocks sold at $1.00 are far more relevant than 1 stock sold at $100. Again, I calculate VWAP for each published time window:

 private static double vwap(IList<QuotesData> quotes)
        {
            if (quotes.Count > 0)
            {
                DateTime max = DateTime.MinValue;
                DateTime min = DateTime.MaxValue;
                string ticker = quotes[0].ID;
                double totalWeightedPrice = 0.0;
                double totalQuantity = 0.0;
                double vwap = 0.0;
                foreach (var q in quotes)
                {
                    double quantity = (double)q.Values(QuoteProperty.LastTradeSize);
                    totalWeightedPrice += (q.LastTradePriceOnly * quantity);
                    totalQuantity += quantity;
                    if (q.LastTradeTime > max)
                        max = q.LastTradeTime;
                    if (q.LastTradeTime < min)
                        min = q.LastTradeTime;
                }
                if (totalQuantity > 0 & totalWeightedPrice > 0.0)
                {
                    vwap = totalWeightedPrice / totalQuantity;
                    return vwap;
                }
            }
            return double.NaN;
        } 

To produce a running version of TWAP or VWAP over a longer period we can always pipe the results into another Window. Like wise if we are aiming to do something a bit more involved such as execute new trades based on our calculations then we would need to wrap our execution API with IObserver<T> and then pipe the relevant data it would need to construct an order. That however is a subject for another day.

Conclusions

Having worked with StreamInsight and Esper (in Java) but not Rx, this article took roughly 2 man days to research and write. In the UK IT managers often cost development projects at £600 per man day, so we can estimate that in my case the cost was less than £1500. Compare this to SQL Server Standard Edition which retails at about £2500 and then add on the cost of implementation and on the face of it we've made a reasonable cost saving.

The limitations though are that this solution is not particularly scale-able nor does it support many of the other extensive features a proper CEP solution provide.

Troubleshooting

You may find that the default implementation only retrieves one stock quote before pausing. If so then it is likely that the exchange is out of trading hours. If this is the case I've provided FTSE-100 symbols so you can easily switch over but if you want anything else then you'll have to copy+paste from Y! for now.

Possible Future Updates

More complex standard calculations such as portfolio VAR

How does this compare to the equivalent in StreamInsight or NEsper (C#) or Esper (Java)?

All and any feedback is greatly appreciated.

History

Revision #1 - The bare bones of an article and code

Revision #2 - Added TumblingWindows, calculators for TWAP and VWAP.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here