Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Real-time statistics with Rx (Statistical Demo App 1 of 2)

0.00/5 (No votes)
10 Jan 2015 1  
Know how an event in your app is performing by using Reactive Extensions

Where is the code?

You can download the code from my GitHub repository:

https://github.com/jlVidal/BasicStatisticsWithRx

Introduction

Reactive Extensions is a powerful library to handle events, in this text you'll see how easily it can be integrated with your application to get real-time statistics about a specific event.

Tip Series

This is tip 1 of 2, which I hope people will like. Shown below is the outline of what I'm going to cover:

  1. Using basic statistics with Rx statistical operators (this page)
  2. Adding time to measure how is performing

Background

Often in the real world it is necessary to measure how many times something happens, and it would be interesting if you had this information available in real time.

Usually the first thing that come up when you need some kind of statistic value is to go through your database. What's the problem in some situations?

When you check the database you are looking for the past, basically it’s about everything that happened, it becomes a problem when you need the information in real time, and this can be worse when you have a big or complex SQL query and need to refresh frequently, even if you will use an automatically process. 

The biggest point here is measure how an event is performing in real time.

Current Features

Today in Rx 2.2.5 we already have some operators to measure the statistics of your events, like:

C#
public static class Observable
{ 
  // ...
  public static IObservable<TSource> Min<TSource>(this IObservable<TSource> source);
  public static IObservable<TSource> Max<TSource>(this IObservable<TSource> source);
  public static IObservable<int> Average(this IObservable<int> source);
  public static IObservable<int> Sum(this IObservable<int> source);
  public static IObservable<int> Count<TSource>(this IObservable<TSource> source);
  // ...
}

The problem with them – maybe not a problem just a design decision  is that they were implemented to give you just the final result, not to show in real time if there's any change. Speaking technically it happens only when the OnCompleted() method is called by an Observer<T> in the IObservable<T> composition. Looking at an implementation the reason is because all of them are using a mechanism like: 

C#
public static IObservable<TSource> Final<TSource>(this IObservable<TSource> source)
{
    return new AnonymousObservable<TSource>(observer =>
    {
        var value = default(TSource);
        var hasValue = false;

        return source.Subscribe(
            x =>
            {
                hasValue = true;
                value = x;
            },
            observer.OnError,
            () =>
            {
                if (!hasValue)
                    observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
                else
                {
                    observer.OnNext(value);
                    observer.OnCompleted();
                }
            });
    });
}

So if you want the statistics in real time about changes in the IObservable<T> basically you need to choose between two options:

  1. Re-implement all the operators: Min/Max/Sum/Count/Average.
  2. Implement a way to complete your subscription regularly, essentially you probably do it anyway to relate data versus time, using for example: Window and GroupByUntil operators.

I'll try to cover both in the tip series, starting with the re-implementation.

Demo App

For now I will cover just the first tab of it. 

simple usage example

Basically after click in Add, all the ListBox components will update with the new information.

By demonstration purpose I opted to NOT filter consecutive repeated values, but you can easily add Observable.DistinctUntilChanged(TSource) method in the operations to see the result.

Using the code

The usage is really simple, since you have an IObservable<T> it is just compose operations, if you are not familiar with Reactive Extensions or how to convert an event to IObservable<T> I suggest you:  http://stackoverflow.com/questions/1596158/good-introduction-to-the-net-reactive-framework 

Another small detail is that there were no worries about disposing created subscriptions in the view model, in a real app it is necessary to take care of it.

About external libraries in the app:

SimpleUsageViewModel.cs code:

public class SimpleUsageViewModel : ReactiveObject
{
    private readonly ReactiveList<decimal> _sum = new ReactiveList<decimal>();

    private readonly ReactiveList<IGrouping<int, decimal>> _modaValue = new ReactiveList<IGrouping<int, decimal>>();
    private readonly ReactiveList<string> _modaText = new ReactiveList<string>();

    private readonly ReactiveList<decimal> _median = new ReactiveList<decimal>();
    private readonly ReactiveList<decimal> _average = new ReactiveList<decimal>();
    private readonly ReactiveList<decimal> _max = new ReactiveList<decimal>();
    private readonly ReactiveList<decimal> _min = new ReactiveList<decimal>();
    private readonly ReactiveList<int> _counter = new ReactiveList<int>();

    private decimal _value;
    private readonly ReactiveList<KeyValuePair<int, decimal>> _history = new ReactiveList<KeyValuePair<int, decimal>>();

    public SimpleUsageViewModel()
    {
        Initialize();
    }

    private void Initialize()
    {
        InsertValueCommand = ReactiveCommand.Create();
        IObservable<decimal> addCommand = InsertValueCommand.Select(a => Value)
                                            .Publish().RefCount();

        ClearStatisticsCommand = ReactiveCommand.Create();
        IObservable<object> cleanMethod = ClearStatisticsCommand;

        addCommand.Select((a, b) => new KeyValuePair<int, decimal>(b + 1, a))
                    .TakeUntil(cleanMethod).Repeat()
                    .Do(a => _history.Insert(0, a))
                    .Subscribe();

        var liveCount = addCommand.LiveCount().TakeUntil(cleanMethod).Repeat();
        liveCount.Subscribe(a => _counter.Insert(0, a));

        var liveAverage = addCommand.LiveAverage().TakeUntil(cleanMethod).Repeat();
        liveAverage.Subscribe(a => _average.Insert(0, a));

        var liveSum = addCommand.LiveSum().TakeUntil(cleanMethod).Repeat();
        liveSum.Subscribe(a => _sum.Insert(0, a));

        var liveMax = addCommand.LiveMax().TakeUntil(cleanMethod).Repeat();
        liveMax.Subscribe(a => _max.Insert(0, a));

        var liveMin = addCommand.LiveMin().TakeUntil(cleanMethod).Repeat();
        liveMin.Subscribe(a => _min.Insert(0, a));

        var liveMedian = addCommand.LiveMedian().TakeUntil(cleanMethod).Repeat();
        liveMedian.Subscribe(a => _median.Insert(0, a));

        var liveModa = addCommand.LiveModa().TakeUntil(cleanMethod).Repeat();
        liveModa.Subscribe(a => _modaValue.Insert(0, a));

        var textModa = _modaValue.ItemsAdded.Select(a => a.Key + "x = " +
                                                     new string(
                                                     a.Select(b => string.Format("{0:C2}", b))
                                                         .Aggregate((b, c) => b + " & " + c)
                                                         .ToArray()
                                                     ));
        textModa.Subscribe(a => _modaText.Insert(0, a));

        cleanMethod.Do(a => _counter.Clear())
                    .Do(a => _average.Clear())
                    .Do(a => _sum.Clear())
                    .Do(a => _max.Clear())
                    .Do(a => _min.Clear())
                    .Do(a => _modaText.Clear())
                    .Do(a => _median.Clear())
                    .Do(a => _history.Clear())
                    .Subscribe();
    }

    public ReactiveCommand<object> InsertValueCommand { get; set; }

    public ReactiveCommand<object> ClearStatisticsCommand { get; set; }

    public decimal Value
    {
        get { return _value; }
        set { this.RaiseAndSetIfChanged(ref _value, value); }
    }

    public ReactiveList<int> Counter
    {
        get { return _counter; }
    }

    public ReactiveList<decimal> Min
    {
        get { return _min; }
    }

    public ReactiveList<decimal> Max
    {
        get { return _max; }
    }

    public ReactiveList<decimal> Average
    {
        get { return _average; }
    }

    public ReactiveList<decimal> Sum
    {
        get { return _sum; }
    }

    public ReactiveList<string> ModaText
    {
        get { return _modaText; }
    }

    public ReactiveList<decimal> Median
    {
        get { return _median; }
    }
    public ReactiveList<KeyValuePair<int, decimal>> Pairs
    {
        get { return _history; }
    }
}

Follows below the implementation of the operators:

C#
    public static class RxExt
    {
        public static IObservable<int> LiveCount<T>(this IObservable<T> source)
        {
            return source.Select((a, b) => b +1);
        }      

        public static IObservable<int> LiveMin(this IObservable<int> source)
        {
            return source.Scan(Int32.MaxValue, Math.Min);
        }  
      
        public static IObservable<int> LiveMax(this IObservable<int> source)
        {
            return source.Scan(Int32.MinValue, Math.Max);
        }

        public static IObservable<int> LiveSum(this IObservable<int> source)
        {
            return source.Scan(0, (a, b) => a + b);
        }

        public static IObservable<double> LiveAverage(this IObservable<int> source)
        {
           return  source.Publish(p => p.LiveCount()
                                        .Zip(p.LiveSum(), (a,b) => (double) b/a));
        }
        //..
     }

Other basic statistical techniques:

C#
        public static IObservable<int> LiveRange(this IObservable<int> source)
        {
            return source.Publish(p => p.LiveMax().Zip(p.LiveMin(), (a, b) => a - b).Skip(1));
        }

        public static IObservable<double> LiveMedian(this IObservable<int> source)
        {
            return source.AccumulateAllOrdered()
                         .Select(a => (a.Count % 2) == 0
                                        ? (double)(a[(a.Count / 2) - 1] + a[a.Count / 2]) / 2
                                        : (double)(a[(a.Count / 2)])
                                );
        }

        public static IObservable<IReadOnlyList<T>> AccumulateAll<T>(this IObservable<T> next)
        {
            return next.Scan(ImmutableList<T>.Empty, (a, b) => a.Add(b));
        }

        public static IObservable<IReadOnlyList<int>> AccumulateAllOrdered(this IObservable<int> source)
        {
            return source.Scan(ImmutableList<int>.Empty, (a, b) =>
            {
                var res = a.BinarySearch(b);
                if (res <= -1)
                    res = ~res;

                return a.Insert(res, b);
            });
        }

        public static IObservable<IGrouping<int, int>> LiveModa(this IObservable<int> source)
        {
            return source.GroupBy(a => a)
                      .Select(a => a.LiveCount().Select(b => Tuple.Create(a.Key, b)))
                      .Merge()
                      .Scan(default(KeyValuePair<int,ImmutableStack<int>>), (a, b) =>
                      {
                          if (a.Key > b.Item2)
                              return a;

                          if (a.Key == b.Item2)
                              return new KeyValuePair<int,ImmutableStack<int>>(b.Item2, a.Value.Push(b.Item1));

                          return new KeyValuePair<int, ImmutableStack<int>>(b.Item2, ImmutableStack<int>.Empty.Push(b.Item1));
                      })
                      .Select(a => a.Value.AsGroup(a.Key));
        }
AsGroup extension method: 
C#
public static class LinqExt
{
    public static IGrouping<K,V> AsGroup<K,V>(this IEnumerable<V> source, K key)
    {
        return SimpleGroupWrapper.Create(key, source);
    }
}

internal static class SimpleGroupWrapper
{
    public static SimpleGroupWrapper<K,V> Create<K,V>(K key, IEnumerable<V> source)
    {
        return new SimpleGroupWrapper<K, V>(key, source);
    }
}

internal class SimpleGroupWrapper<K,V> : IGrouping<K,V>
{
    private readonly IEnumerable<V> _source;
    private readonly K _key;
    public SimpleGroupWrapper(K key, IEnumerable<V> source)
    {
        if (source == null)
            throw new NullReferenceException("source");

        _source = source;
        _key = key;
    }
    public K Key
    {
        get { return _key; }
    }

    public IEnumerator<V> GetEnumerator()
    {
        return _source.GetEnumerator();
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return _source.GetEnumerator();
    }
}

Just be careful with LiveMedian and LiveModa methods, you can get an OutOfMemoryException because it stores all values that happened in the subscription.

Points of Interest

A small boring part is that there is nothing like "numeric" base type among the scalar types, there's IComparable<T> but it doesn't resolve the problems at all. It is unfortunately necessary to implement the same operators with the same code for all of them. For now I just did for int, double, decimal types.

What's next?

How this code can become useful for your life? Basically turning it in a real metric, getting the values by second, minute, hour, day, or in a specific interval that will complete the subscription regularly which allows to use the standard implementation methods that is what I expect to cover in next tip.

In a real world the events would be integrated with other application separately, for the purpose of monitoring. Maybe using a web service with WCF or a real technology like SignalR that would be great with a web dashboard.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here