Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

WPF / MVVM Real-Time Trading Application

0.00/5 (No votes)
10 Feb 2012 1  
This project is a real-time multi-threaded trading application framework developed with WPF / MVVM
Source code is available at WpfRealTime CodePlex site

Prelude

On July 1st, 2009 I received a phone call from a recruitment agent promising to make me “an offer I couldn't refuse”. I had no clue how he got hold of my name, phone number or place of work but he seemed to have assumed a very strong connection between me and the WPF/MVVM that he kept repeating slightly raising his voice on the second M. At the time I was quite happy tacked away on a “not so easily reachable floor” of the middle office IT in some bank. The project I was working on was strategic but utterly meaningless which was almost perfect so I decided I was not going to be easily budged.

Trying to avert his suspicions about my MVVM connections I agreed to meet the client where after a quick cross examination my MVVM involvement became apparent. The verdict was delivered shortly after that and I have been contracted to build a “real-time low-latency multi-threaded front-office trading application using WPF/MVVM”

Requirements

My new manager was precise, concise and wasted no time in furnishing me with a set of requirements. Here they are:

  • The application needs to be responsive all the time, display trading data “very fast”, never hang, freeze, crash or run out of memory.

I have moved desk several times trying to hide (the technique I successfully employed before) but my manager kept finding me every time and chasing me up for an update. I had no choice but to get on with my task. So I “googled” and apparently “responsiveness and fast” had something to do with real-time or low latency (we will look at this in detail in a minute),”hang, freeze and crash” is a field of study in complex adaptive systems and I decided to throw in composite UI on top since it is a must in modern UI systems.

So my technical requirements were:

  • real-time reliability
  • adaptability
  • extendibility, scalability and modularity (which is all the same thing but sounds better put together in one sentence )

Crash Course in Real-Time Systems Development

The term “Real-time systems” can refer to a lot of different things. One of RT systems properties is that they are aware of the hardware they run on and use very low level programming languages to access hardware specific calls. They often run directly on the hardware (no operating system) or with the help of real-time executives or real-time operating systems. Now because they are designed to run on a specific hardware they are not easily scalable, portable, etc. That is why general design pattern won't apply to real-time systems – they are “too generic”.

However the main property of RT systems is that they are designed with the “real-time constraint”. That does not mean that they run fast (that would be just low latency) but all tasks guaranteed to complete execution within defined “real-time constraint”. i.e. I guarantee to display any newly arrived data within 50 ms.

So at the heart of each RT system would be a real-time scheduling algorithm. The general idea is that all tasks in the system can be split into periodic or sporadic. Periodic are well known in advance, we know when they start, how long they run for (budget or deadline), their priorities. Sporadic tasks can start any time and we can only allocate estimated budget (processor time) before they need to be interrupted. Some of the better known algorithms are “priority based pre-emptive schedulers” and “earliest deadline first”. Earliest deadline first as the name suggests are organizing tasks in order of their deadlines (the time when the task must have been executed). Priority based algorithms are organizing tasks in order of their priorities and interrupt tasks if a task with a higher priority has arrived. One of the priority based algorithms is a rate-monotonic scheduler. Here is how it works:

Rate-monotonic scheduler puts all periodic tasks into the waiting queue. At the start of a new period the scheduler will examine the waiting queue and move the selected tasks into the “ready to run” queue. At this point the scheduler will perform context switching if there are any tasks in the “ready to run queue” with a deadline less than a deadline of the task currently being scheduled. Rate-monotonic algorithm would accumulate sporadic tasks and run them as a batch on designated periodic task since they were considered a lower priority.

As we all know starting from .NET 4.0 Microsoft has released Task Parallel Library (TPL) that allows developers to define tasks and write custom Task Schedulers to run them. This is a perfect place for us to implement our real-time framework. We will define PeriodicTask and SporadicTask classes that will inherit from System.Threading.Task class and we will implement our own PriorityTaskScheduler that will inherit from TaskScheduler class. Since we can buffer the arriving market data (with the help of Rx) and dispatch it to the GUI at regular intervals we call these tasks periodic. Any user interaction (clicking buttons etc.) cannot be predicted and so we call these tasks sporadic. We will use Rate-monotonic algorithm as a base with the only difference in the way we treat sporadic tasks. My requirement is that “GUI needs to be responsive all the time” and that is why I consider sporadic tasks a higher priority than any periodic task and will run them first.

Complex Adaptive Systems

Although I have seen a lot of research materials on how complex systems should adapt to different conditions, monitor resources, self-validate and restore components I have not seen a lot of implementations of this in the UI world. So I am going to build in some of this functionality into the framework and hopefully that would be just enough to meet my “never hang, freeze, or crash” requirement.

Since I am using composite or modular UI it is very easy to implement some component monitoring functionality. The main shell establishes connection with its modules and sends regular heartbeats to make sure all parts of the system are responsive. Should any of the modules fail to respond we will need to unload this screen from memory and where possible reload with a new instance of it. I am going to run each view as a new Window class so it can be assigned its own dispatcher. This way if a particular view “hangs” the rest of the UI is still “live and kicking”. Our main shell “never hangs or freezes” just because we are not doing any work on it's dispatcher. Its job is to load system's components, monitor the heartbeats and reload other modules/views. So because each view has its own dispatcher we can process a lot more updates without slowing doing the whole application.

Each view will also be assigned a MaintenanceEngineer class whose job is to monitor view's memory usage, send heartbeats and monitor the dispatcher queue. It can make a decision whether we are falling behind with periodic updates or user interactions (sporadic tasks).

Composite UI

When it comes to composite UI WPF developers are spoilt for choice. For my purposes I narrowed my choice down to three frameworks namely Prism, MeffedMVVM by Marlon Grech and Cinch by Sacha Barber. I was only guided by the principle that using one source is plagiarism but more than one is research work. That suits me because I am just going to pinch some useful features for my real-time framework.

Prism

The best thing about Prism is ...eh ...well modularity. That is achieved with the help of Bootstrapper class that loads modules. I much prefer the way Marlon does it with MEF so I think we ll pinch it from his MeffedMVVM . Second best thing is … decoupling … achieved with EventAggregator class. Again I much prefer the way Sacha does it with his Mediator class which he lifted from Karl Shifflett's work who lifted it from Marlon who lifted it from Josh Smith (but not necessarily in that order). I am going to make some changes to it as well that we can discuss later. There are also Region Adapters which I am not going to use (because it is way over engineered and pretty useless in my opinion) and Unity which I am not going to use either because I can use MEF to instantiate my objects and I don't want to build long dependency chains as Unity does (I have this idea that ViewModels and Services should be completely decoupled and only communicate via Mediator where they subscribe to topics and broadcast async messages to these topics – sort of a GUI enterprise bus, bear with me perhaps I can explain it better a bit later). So in fact from Prism we borrow ...er ..nothing

MeffedMVVM

In MeffedMVVM Marlon uses MEF to link Views and ViewModels. I am going to use his idea but I will use MEF to resolve ViewModels and Services. There is no need for them to reference each other because they both have a reference to singleton Mediator class that provides publish / subscribe mechanics and chooses the TaskScheduler to run the tasks. There is also strong “one View – one ViewModel” connection that is defined at the design time so no need to link them dynamically at the run time. Marlon also defines “design time” behaviour for his views so each control has some dummy data that can be presented in Blend. I am going to use this idea but apply it to the Services rather then the Views. So … each Service would have a MockService version of itself and with a simple config change you should be able to run the GUI with real or mock services. This allows you to run and work on the GUI when the Server is down (as they do) or the Server side component is not ready (I don't think this is what Marlon meant by his “design time data” but anyhow...) So from MeffedMVVM we are going to borrow ...well MEF and MVVM

Cinch

Now this is where we are going to lift a lot of stuff from.So ... thank you Sacha. We start with WeakEventAction, WeakAction, EventToCommandTrigger and slightly modified Mediator. They are brilliant and there is no need for me to repeat how they work since you can just refer to the original post on Cinch by Sacha Barber.

MVVM and whatnot

As you have probably guessed I am going to use MVVM so there are some SimpleCommand and AttachedProperties classes lying around. It is probably impossible to trace the author of these by now so I will attribute them to Sacha Barber anyway.

Another class worth mentioning is the Entity. It is an old .NET trick to bind data to the grid with custom PropertyDescriptor and ITypedList. So all data is passed around as Entity object, google “property bag .NET” if you need more details on how it works.

I am also using Reactive Extensions (Rx) to throttle and conflate arriving data.

Design

Well … I have to say that the restaurant in this bank was absolutely dire so I decided to crack on with my design.

So with all the usual MVVM suspects present - Views, ViewModels and Services - we also have the Shell which just loads / unloads modules and monitors heartbeats rate /resources, etc. And we have modules that implement business specific logic. Each module has Views, RibbonView, ViewModels and ServiceObservers. When the Shell starts the Bootstrapper will discover the modules on the disk using MEF. It will add RibbonViews to the Shell's content, load Views (each on its own dispatcher) in separate Windows and create Services. It will also instantiate the Mediator where ViewModels and services would “register their interest”. So what happens is Mediator has a list of topics that define all possible flows in the system. It will keep a list of delegates to run if a message is being broadcast on a specific topic.

So ViewModels and Services all loaded in memory chatting away via the Mediator message bus,Mediator picks a right TaskScheduler to guarantee real-time updates, ViewModels update Views and the Shell is talking to MaintenanceEngineers to make sure everybody is playing nicely.

Spoon? There is no spoon (xaml.cs)

I was debating (with myself) whether Bootstrapper should resolve View or ViewModel which brings us to the old argument whether V should reference the VM or VM should reference the view. To be honest I don't think there is much of a different as long as you pick your pattern and stick to it. All MVVM, MVC and MVP do is quite similar with some subtle differences (refer to my previous MVVM post). Anyway the point is that the main goal of MVVM is to allow “Blend-ability” and that to me means one thing only – no xaml.cs. There are a lot of good frameworks where people would still write in the xaml.cs and so you need to switch between VM and xaml.cs to find where things are which results in a bit of a mess.

So I took “no xaml.cs” quite literally and .. removed xaml.cs. So if you look at the project you will only find xaml files with no corresponding xaml.cs. This is possible because the only thing that you really need from .cs file is to call Initialise() and assign DataContext which is what I do in the BaseViewModel. So effectively each ViewModel creates its View, Initialises its content and assigns DataContext which makes sense to me since its VM's job to control the View

After another “update meeting” my manager convinced me that I still didn't see a bigger picture. So here is it – a bigger picture:

And 'cause I think drawing “bigger picture” diagrams is kids play – I got my four year old and three year old to do it for me (well.. actually no I did it myself :( )

Bigger Picture

Picture 1 - Bigger Picture

So if we start from back to front the first point of data entry is Services. All services have real and mock implementation classes that can be discovered and loaded by MEF. In my case real services are loaded in a separate process and communicate via WCF NamedPipes where as mock services just referenced by the main application. All you need to do is to change Solution Configuration in VS to build real or mock version. This is achieved with a post build script that changes the config file values after the build. This is a great way to test/develop your application without connecting to a server. Services receive the data and build key value pair data contracts. These events are being “exploded” and observed by Rx. They will then be throttled and thrown at the GUI with regular, observed intervals (this is the first part of our real-time constraint). Normally it would be one-to-one connection between service and service observer but not necessarily. One ServiceObserver can observe more than one service and use Rx Merge extension to present these events as if they came from the same source (take a look at the BaseServiceObserver class). ServiceObserver would conflate any data if a newer version arrived before the previous version was dispatched. It would build the Entity with an EntityBuilder and use Mediator to broadcast these Tasks to whoever is interested.

And so Entities will enter the Mediator that will find which ViewModel has registered their interest and find the delegates that it needs to run. Mediator will create a task that encapsulates the delegate to run and the newly arrived data for that delegate. Mediator will pass that task to appropriate TaskScheduler. LongRunningTaskScheduler will run all tasks on a separate thread using processor affinity. So all services process their data on just one processor leaving the other processors to the PriorityTaskScheduler. If you still remember what we said about real-time systems being aware of the hardware they run on? So in my case I know I will use four core machine and using a series of tests I determined that this application best runs when all services run on one processor and the other three are used by PriorityTaskScheduler to run periodic and sporadic tasks. It's up to you to performance tune your application and the hardware that you use. (Because TS is your one point entry for all data flows in the system it is very easy to log all the stats that you need there)

Processor Affinity

Picture 2 - Processor Affinity

PriorityTaskScheduler will start three threads (one for each core) that will be processing the incoming tasks. Tasks will be queued in the waiting queue and every defined period moved to the ready to run queue (performing context switch if necessary). Now the delegates that we will run are methods defined on ViewModels that registered their interest for the specific topic. It will notify the View that the Entity has been updated.

Traditionally it would have to call INotifyPropertyChanged for that. There is a problem with this approach. Any Windows old-timer knows that “UI elements can only be updated on the thread that owns them”. This is actually not true or shall we say not exactly true … It should be rephrased like “GUI framework developers want you to update UI elements on the thread that owns them”. And so WPF developers are not different and for that reason they force us to call INotifyPropertyChanged from the property setter and do Dispatcher.BeginInvoke every time we need to update a property. This is clearly not going to work for us since the Entity are being updated constantly in real-time on many different threads so imagine if we called Dispatcher.BeginInvoke all the time? The GUI would not be “very responsive” at all. So instead we are going to let Entity be updated on multiple threads and move INotifyPropertyChanged into a separate method that can be called whenever we ready to notify the View. I can see that some of you are starting to think about a potential race condition and that is true (to an extent) it is possible that the value of the property we display is different from the current value of the property (another thread came in and changed it) ….. However since we guarantee to call INotifyPropertyChanged within the specified real-time constraint (lets say 50ms) we therefore guarantee that the user will see the most recent data as it arrived to us with a maximum 50ms delay. The actual call to INPC is done within NotifyCollection (we look at it in more detail in a sec) so all a VM developer needs to do is to call AddOrUpdate method and Bob's your uncle

Now remember what I said that generic design patterns don't always work for real-time applications? This is certainly the case here 'cause in a pure WPF UI architecture the Entity would have no knowledge about UI specific elements i.e. Background Colour etc. But since we don't care about patterns but care about real-time we can save a lot of UI thread time by doing a lot of work in advance. That's why Entity knows about its cell content, colour, size and assigns them on a background thread just to notify the UI thread later. So the Entity becomes that pulsing constantly updated cache of UI bounded elements and GUI is guaranteed to be notified about the latest state within real-time constraint.

This is how the data flows in WPF Real-Time framework. Events (come from the server) that turn into Entities (conflated and throttled) that turn into Tasks (guaranteed real-time execution).

WPF Real-Time App

Picture 3 - WPF Real-Time App

By allowing data flow freely and not worrying about the locks but guarantee RT constraint instead we are able to achieve “very responsive GUI” and “very fast updates” This pretty much is the bigger picture. I just want to highlight some of the main classes that are worth lifting for your future projects. Here they are:

Key Classes

BlockingCircularBuffer

I lifted this one from C++ BOOST library CircularBuffer class but use it as a BlockingCollection from .NET. So it is in fact similar to BlockingCollection. The only difference is that unlike BlockingCollection it uses fixed size array which is more memory efficient.

...
        public void Enqueue(T item)
        {
            lock (_lock)
            {
                while (IsFull() || _writingSuspended)
                {
                    Monitor.Wait(_lock);
                }
                _buffer[_writerIndex] = item;
                _writerIndex++;
                _writerIndex %= _size;
                Monitor.Pulse(_lock);
            }
        }
...
        public T Dequeue()
        {
            T item;
            lock (_lock)
            {
                while (IsEmpty())
                {
                    Monitor.Wait(_lock);
                }
                item = _buffer[_readerIndex];
                _readerIndex++;
                _readerIndex %= _size;
                Monitor.Pulse(_lock);
            }
            return item;
        }
...

NotifyCollection

This class implements INotifyCollectionChanged and so we can just throw OnCollectionChanged with NotifyCollectionChangedEventArgs in AddOrUpdate to minimize the number of updates. This allows us to store the data in Dictionary (so it is easier and quicker to search) and only throw OnCollectionChanged without searching and updating the elements of the collection

    public class NotifyCollection<T> : Collection<T>, ITypedList, INotifyCollectionChanged where T : SelfExplained
    {
         ...
        public void AddOrUpdate(IEnumerable<T> items, bool isReplace)
        {
            foreach (var item in items)
            {
                if (!Contains(item))
                {
                    Add(item);
                    OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
                }
                else
                {
                    if (isReplace)
                    {
                        OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Replace, item, item));
                    }
                    else
                    {
                        foreach (var property in item.GetPropertyNames())
                        {
                            item.OnPropertyChanged(property);
                        }
                    }
                }
            }
        }
         ...
    }

WpfGridColumn

This is where the binding magic happens

...
        protected override FrameworkElement GenerateEditingElement(DataGridCell cell, object dataItem)
        {
            TextBox editElement = new TextBox {BorderThickness = new Thickness(0.0), Padding = new Thickness(0.0)};

            System.Windows.Data.Binding textBinding = new System.Windows.Data.Binding(cell.Column.Header + ".DisplayValue")
                                                          {Source = dataItem};
            editElement.SetBinding(TextBox.TextProperty, textBinding);

            System.Windows.Data.Binding backgroundBinding = new System.Windows.Data.Binding(cell.Column.Header + ".Background")
                                                                {Source = dataItem};
            editElement.SetBinding(TextBlock.BackgroundProperty, backgroundBinding);

            System.Windows.Data.Binding foreGroundBinding = new System.Windows.Data.Binding(cell.Column.Header + ".Foreground")
                                                                {Source = dataItem};
            editElement.SetBinding(TextBlock.ForegroundProperty, foreGroundBinding);

            return editElement;
        }
...

Mediator

Enough said about this class. You know what is does ... distributes Tasks to registered users ... and picks the right TaskScheduler

...
        public bool Broadcast(string key, params object[] message)
        {
            List<WeakAction> wr;
            lock (_registeredHandlers)
            {
                if (!_registeredHandlers.TryGetValue(key, out wr))
                    return false;
            }

            foreach (var cb in wr)
            {
                Delegate action = cb.GetMethod();
                if (action == null) continue;
                switch (cb.TaskType)
                {
                    case TaskType.Background:
                        // check if already running
                        if (!_baseQueue.ContainsKey(cb.Target.Target))
                        {
                            var bgTask = _backgroundTaskFactory.StartNew(() => action.DynamicInvoke(message));
                            _baseQueue.Add(cb.Target.Target, bgTask);
                        }
                        break;

                    case TaskType.Periodic:
                        var periodicTask = new PeriodicTask(() => action.DynamicInvoke(message), _budget);
                        periodicTask.Start(_preemptiveScheduler);
                        break;

                    case TaskType.Sporadic:
                        // one Periodic to run all sporadics
                        var sporadicTask = new SporadicTask(() => action.DynamicInvoke(message), _budget);
                        sporadicTask.Start(_preemptiveScheduler);
                        break;
                    case TaskType.LongRunning:
                        //UI
                        _staTaskFactory.StartNew(() => action.DynamicInvoke(message));
                        break;
                }

            }

            lock (_registeredHandlers)
            {
                wr.RemoveAll(wa => wa.HasBeenCollected);
            }

            return true;
        }
...

PriorityTaskScheduler

Implements rate-monotonic algorithm. Waiting queue, ready to run queue ..didn't we talk about it already?

...
       public PriorityTaskScheduler()
        {
            //TODO: create category
            //_messageCounter = new PerformanceCounter();
            _tsp = Convert.ToInt32(ConfigurationManager.AppSettings["TASK_SCHEDULER_PERIOD"]);
            _buferSize = Convert.ToInt32(ConfigurationManager.AppSettings["BUFFER_SIZE"]);
            _threshold = Convert.ToDouble(ConfigurationManager.AppSettings["SUSPENSION_THRESHOLD"]);
            _readyToRunQueue = new BlockingCircularBuffer<PeriodicTask>(_buferSize);
            _waitingQueue = new ConcurrentQueue<Task>();

            var executor = new System.Timers.Timer(_tsp);
            executor.Elapsed += WaitingQueueToReadyToRun;
            executor.Start();

            for (int i = 0; i < Environment.ProcessorCount; i++)
            {
                _backgroundTaskFactory.StartNew(() =>
                {
                    while (true)
                    {
                        var task = _readyToRunQueue.Dequeue();
#if DEBUG
                        _log.Debug("Message Dequeued");
                        //_messageCounter.Decrement();
#endif    
                        if (TryExecuteTask(task))
                        {
                            var span = DateTime.UtcNow - task.TimeCreated;
                            if (DateTime.UtcNow > task.Deadline)
                            {
                                _log.Warn(String.Format("Real-time Deadline exceeded : {0}", span.TotalMilliseconds));
                                //throw new ApplicationException("Real-time Deadline exceeded");
                            }
#if DEBUG
                            _log.Debug("Message Done");
#endif
                        }
                    }
                });
            }    
        }

        private void WaitingQueueToReadyToRun(object sender, System.Timers.ElapsedEventArgs e)
        {
            Task task;
            while (_waitingQueue.TryDequeue(out task))
            {
                // check budget and invoke a context switch
                PeriodicTask headTask;
                if (task is PeriodicTask)
                {
                    headTask = (PeriodicTask)task;
                    var nextToRun = _readyToRunQueue.Peek();

                    if ((nextToRun != null) && (nextToRun.Status == TaskStatus.WaitingToRun)
                        && headTask.Deadline < nextToRun.Deadline)
                    {
                        _log.Info("Context switching at: " + DateTime.UtcNow);
                        var dequeuedTask = _readyToRunQueue.Dequeue();
                        _readyToRunQueue.Enqueue(headTask);
                        _readyToRunQueue.Enqueue(dequeuedTask);
                    }

                    _readyToRunQueue.Enqueue(headTask);
                }  
            }
        }

        protected override void QueueTask(Task task)
        {
#if DEBUG
            _log.Debug("Message Enqueued");
            //_messageCounter.Increment();
#endif
            // jump the queue
            if (task is SporadicTask)
            {
                TryExecuteTask(task);
                _log.Info("Sporadic jumped the queue at: " + DateTime.UtcNow);
                return;
            }
            
            _waitingQueue.Enqueue(task); 
        }
...

BaseServiceObserver

Uses Rx to conflate the data

...
        public virtual void AddServicesToObserve(IEnumerable<IService> services)
        {
            _entityIndex = new ConcurrentDictionary<ulong, Entity>();

            var observer = services.Select(s => 
                Observable.FromEvent<EventArgs<DataRecord>>(h => s.DataReceived += h, h => s.DataReceived -= h))
                .ToList()
                .Merge();

            observer.Select(x => x.EventArgs.Value)
                    .Where(x => x.DataRecordKey != null)
                    .BufferWithTime(TimeSpan.FromMilliseconds(_od))
                    .Where(x => x.Count > 0)
                    .Subscribe(DataReceived, LogError);
        }
...

BaseViewModel

Resolves the View and calls Initialise() so we can get rid of xaml.cs

...
        protected BaseViewModel(string view, bool vmResolvesView, bool signedMaintenanceContract)
        {
            Log = LogManager.GetLogger(view);

            // ViewModel resolves the view
            if (vmResolvesView)
            {
                var attr = (ViewAttribute)GetType().GetCustomAttributes(typeof(ViewAttribute), true).FirstOrDefault();
                if (attr == null) throw new ApplicationException("ViewAttribute is missing");

                ViewReference = (FrameworkElement)Activator.CreateInstance(attr.ViewType);

                Uri resourceLocater = new Uri("/" + attr.ViewType.Module.ScopeName.Replace(".dll", "") + ";component/Views/" +
                                                attr.ViewType.Name + ".xaml", UriKind.Relative);
                Application.LoadComponent(ViewReference, resourceLocater);
                ViewReference.DataContext = this;
            }
            // View already resolved the viewModel

            if (ViewReference is Window)
            {
                ((Window)ViewReference).Left = Convert.ToDouble(ViewReference.GetValue(WindowProperties.LeftProperty));
                ((Window)ViewReference).Top = Convert.ToDouble(ViewReference.GetValue(WindowProperties.TopProperty));
                ((Window)ViewReference).Width = Convert.ToDouble(ViewReference.GetValue(WindowProperties.WidthProperty));
                ((Window)ViewReference).Height = Convert.ToDouble(ViewReference.GetValue(WindowProperties.HeightProperty));
            }

            if (signedMaintenanceContract)
                DispatcherFacade = new MaintenanceEngineer(view, Log, Dispatcher);

        }
...

MaintenanceEngineer

Monitors the dispatcher queue and calls DoEvents extension if the queue is growing

...
        private long _operationsQueueCount = 0;
        private readonly ConcurrentDictionary<DispatcherOperation, object> _operations = new ConcurrentDictionary<DispatcherOperation, object>();

        private void Heartbeat(long l)
        {
            var heartbeat = new Heartbeat(_dynamicViewName, String.Format("{0} View heartbeat sent at: {1}", _dynamicViewName, DateTime.UtcNow.ToLongTimeString()), DateTime.UtcNow, false);
            Action w = () => Mediator.GetInstance.Broadcast(Topic.ShellStateUpdated, heartbeat);
            AddToDispatcherQueue(w);
        }

        private void MonitorDispatcherQueue(long l)
        {
            if (_operationsQueueCount != 0)
                _log.Info(String.Format("Dispatcher Operations In Queue {0}, ", _operationsQueueCount));

            if (_operationsQueueCount > _qs)
            {
                _log.Info("Pushing all Dispatcher operations");
                Application.Current.DoEvents();
                _operations.Clear();
                Interlocked.Exchange(ref _operationsQueueCount, 0);
            }
        }


        #region IDispatcherFacade Members

        public void AddToDispatcherQueue(Delegate workItem)
        {
            var operation = _dispatcher.BeginInvoke(DispatcherPriority.Background, workItem);

            operation.Completed += (s, o) =>
            {
                Interlocked.Decrement(ref _operationsQueueCount);
                object t;
                _operations.TryRemove((DispatcherOperation)s, out t);
            };
            _operations.TryAdd(operation, null);
            Interlocked.Increment(ref _operationsQueueCount);

        }
        #endregion
...

RealService

some WCF stuff

...
        public RealService()
        {
            // out channel
            _channelFactory = new ChannelFactory<IRemoteSubscriptionService>(
                new NetNamedPipeBinding(NetNamedPipeSecurityMode.None) { MaxReceivedMessageSize = 5000000, MaxBufferSize = 5000000 },
                "net.pipe://WPFRealTime/SubscriptionService");
            _proxy = _channelFactory.CreateChannel();

            // in channel
            ServiceHost host = new ServiceHost(new ConnectionListener(this));
            host.AddServiceEndpoint(typeof(IRemotePublishingService),
                 new NetNamedPipeBinding(NetNamedPipeSecurityMode.None) { MaxReceivedMessageSize = 5000000, MaxBufferSize = 5000000 },
                "net.pipe://WPFRealTime/Bond/PublishingService");
            host.Open();
        }

        private void SendData(DataRecord data)
        {
            EventHandler<EventArgs<DataRecord>> handler = DataReceived;
            if (handler != null)
            {
                handler(this, new EventArgs<DataRecord>(data));
            }
        }

        #region IService Members
        [RegisterInterest(Topic.BondServiceGetData, TaskType.Background)]
        public void GetData()
        {
            try
            {
                _proxy.GetData(new RequestRecord() { AssetType = AssetType });
            }
            catch (Exception ex)
            {
                throw new ApplicationException("WCF channel is closed", ex);
            }
        }

        public event EventHandler<EventArgs<DataRecord>> DataReceived;
...

BondServiceObserver

Note we can change "GUI owned" Entities before passing them to the Dispatcher

    [Export(typeof(BaseServiceObserver))]
    public class BondServiceObserver : BaseServiceObserver
    {
        public BondServiceObserver()
        {
            AddEventExploder(EventExploder);
        }

        public override void AddServicesToObserve(IEnumerable<IService> services)
        {
            //pass filter
            base.AddServicesToObserve(services.Where(s => s.AssetType == AssetType.Bond));
        }

        public void EventExploder(IEnumerable<Entity> messages)
        {
            // apply rules
            RuleEngine.ApplyRules(messages);

            // broadcast news or updates using Mediator EventExploder
            Mediator.GetInstance.Broadcast(Topic.BondServiceDataReceived, messages); 
        }
    }

BondViewModel

Sample View Model

...
        public BondViewModel() : base("Bond Module", true, true)
        {
            DynamicViewName = "Bond Module";
            _grid = GetRef<DataGrid>("MainGrid");
 
            // InputManager.Current.PreProcessInput += new PreProcessInputEventHandler(Current_PreProcessInput);
            // InputManager.Current.PostProcessInput += new ProcessInputEventHandler(Current_PostProcessInput);

            Entities = new NotifyCollection<Entity>(EntityBuilder.LoadMetadata(AssetType.Common, AssetType.Bond));
            CreateColumnsCommand = new SimpleCommand<Object, EventToCommandArgs>((parameter) => true, CreateColumns);
        }

        [RegisterInterest(Topic.BondServiceDataReceived, TaskType.Periodic)]
        private void DataReceived(IEnumerable<Entity> entities)
        {
            Action w = () => Entities.AddOrUpdate(entities, false);
            DispatcherFacade.AddToDispatcherQueue(w);
        }

        [RegisterInterest(Topic.BondModuleHang, TaskType.Sporadic)]
        private void Hang()
        {
            ProcessOnDispatcherThread(() => Thread.Sleep(10000));
        }

        [RegisterInterest(Topic.BondModuleOpen, TaskType.Sporadic)]
        private void OpenClose(bool  state)
        {
            ProcessOnDispatcherThread(() =>
            {
                if (state)
                    ((Window)ViewReference).Show();
                else
                    ((Window)ViewReference).Hide();
            });
        }
...

Bootstrapper

Uses MEF to discover and load components

...
        private readonly Action<Lazy<IDynamicViewModel>> _createView = ((t) =>
        {
            var vm = t.Value;

            Mediator.GetInstance.Register(vm);
            Window view = (Window)((BaseViewModel)vm).ViewReference;
            RunningWindows.TryAdd(vm.DynamicViewName, view);
            var heartbeat = new Heartbeat(vm.GetType().ToString(), String.Format("{0} View loaded at: {1}", vm.GetType().ToString(), DateTime.UtcNow.ToLongTimeString()), DateTime.UtcNow, true);

            Mediator.GetInstance.Broadcast(Topic.ShellStateUpdated, heartbeat);
            //view.Show();
            view.Closed += (sender, e) => view.Dispatcher.InvokeShutdown();
            Dispatcher.Run();
        });

        private void InjectDynamicViewModels(bool multiDispatchers)
        {
            foreach (var lazy in DynamicViewModels)
            {
                Lazy<IDynamicViewModel> localLazy = lazy;
                if (multiDispatchers)
                {
                    Mediator.GetInstance.Broadcast(Topic.BootstrapperLoadViews, localLazy); 
                }
                else
                {
                    CreateView(localLazy);
                }
                
            }
        }

        private void InjectServices()
        {
            foreach (var lazy in Services)
            {
                var service = lazy.Value;
                Mediator.GetInstance.Register(service);
                var heartbeat = new Heartbeat(service.GetType().ToString(), String.Format("{0} Service loaded at: {1}", service.GetType(), DateTime.UtcNow.ToLongTimeString()), DateTime.UtcNow, true);

                Mediator.GetInstance.Broadcast(Topic.ShellStateUpdated, heartbeat);
            }
            //inject service observers
            foreach (var lazy in ServiceObservers)
            {
                lazy.Value.AddServicesToObserve(Services.Select(s => s.Value));
            }
        }

        private void InjectStaticViewModels(MainWindow mainWindow)
        {
            foreach (var vm in StaticViewModels)
            {
                Mediator.GetInstance.Register(vm);
                mainWindow.RibbonRegion.Items.Add(new TabItem { Content = ((BaseViewModel)vm).ViewReference, Header = vm.StaticViewName });
            }
        }
...

Finale

My requirements were to build a “real-time low-latency multi-threaded front-office trading application using WPF/MVVM” which "needs to be responsive all the time, display trading data very fast, never hang, freeze, crash or run out of memory".

I have achieved that by implementing rate-monotonic scheduling algorithm using .NET Parallel Task Framework and so it is real-time.

I run GUI on more than one Dispatcher so it is very responsive.

I allowed data to flow freely and modify "GUI element" on non-UI threads so it updates GUI very fast.

I shamelessly steal components from fellow WPF developers namely Sacha Barber, Marlon Grech, Kent Boogaart

The only place where I think I failed is to provide a generic mechanism to handle sporadic events. As it is it's up to developer to mark an event as sporadic. What would be nice is to have a mechanism in place where every user action will be automatically picked up before it gets into WPF event routing and processed as a sporadic event. I was looking to use System.Windows.Input.InputManager for this but failed. So if anyone has any idea on how to intercept any user input in WPF and process it as a sporadic event in our TaskScheduler I would like to hear from you.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here