Source code is available at WpfRealTime CodePlex site
Prelude
On July 1st, 2009 I received a phone call from a recruitment agent
promising to make me “an offer I couldn't refuse”. I had no clue how he
got hold of my name, phone number or place of work but he seemed to
have assumed a very strong connection between me and the WPF/MVVM that
he kept repeating slightly raising his voice on the second M. At the
time I was quite happy tacked away on a “not so easily reachable
floor” of the middle office IT in some bank. The project I was working
on was strategic but utterly meaningless which was almost perfect so I
decided I was not going to be easily budged.
Trying to avert his suspicions about my MVVM connections I agreed to
meet the client where after a quick cross examination my MVVM
involvement became apparent. The verdict was delivered shortly after
that and I have been contracted to build a “real-time low-latency
multi-threaded front-office trading application using WPF/MVVM”
Requirements
My new manager was precise, concise and wasted no time in furnishing
me with a set of requirements. Here they are:
- The application needs to be responsive all the time, display trading
data “very fast”, never hang, freeze, crash or run out of memory.
I have moved desk several
times trying to hide (the technique I successfully employed before)
but my manager kept finding me every time and chasing me up for an
update. I had no choice but to get on with my task. So I “googled” and
apparently “responsiveness and fast” had something to do with
real-time or low latency (we will look at this in detail in a
minute),”hang, freeze and crash” is a field of study in complex
adaptive systems and I decided to throw in composite UI on top since
it is a must in modern UI systems.
So my technical requirements were:
- real-time reliability
- adaptability
- extendibility, scalability and modularity (which is all the same thing
but sounds better put together in one sentence )
Crash Course in Real-Time Systems Development
The term “Real-time systems” can refer to a lot of different things.
One of RT systems properties is that they are aware of the hardware they
run on and use very low level programming languages to access hardware
specific calls. They often run directly on the hardware (no operating
system) or with the help of real-time executives or real-time
operating systems. Now because they are designed to run on a specific
hardware they are not easily scalable, portable, etc. That is why general
design pattern won't apply to real-time systems – they are “too
generic”.
However the main property of RT systems is that they are designed with the
“real-time constraint”. That does not mean that they run fast (that
would be just low latency) but all tasks guaranteed to complete
execution within defined “real-time constraint”. i.e. I guarantee to
display any newly arrived data within 50 ms.
So at the heart of each RT system would be a real-time scheduling
algorithm. The general idea is that all tasks in the system can be
split into periodic or sporadic. Periodic are well known in advance,
we know when they start, how long they run for (budget or deadline),
their priorities. Sporadic tasks can start any time and we can only allocate
estimated budget (processor time) before they need to be interrupted. Some of the better known algorithms are “priority
based pre-emptive schedulers” and “earliest deadline first”. Earliest
deadline first as the name suggests are organizing tasks in order of
their deadlines (the time when the task must have been executed).
Priority based algorithms are organizing tasks in order of their
priorities and interrupt tasks if a task with a higher priority has
arrived. One of the priority based algorithms is a rate-monotonic
scheduler. Here is how it works:
Rate-monotonic scheduler puts all periodic tasks into the waiting
queue. At the start of a new period the scheduler will examine the
waiting queue and move the selected tasks into the “ready to run”
queue. At this point the scheduler will perform context switching if
there are any tasks in the “ready to run queue” with a deadline less
than a deadline of the task currently being scheduled. Rate-monotonic
algorithm would accumulate sporadic tasks and run them as a batch on
designated periodic task since they were considered a lower priority.
As we all know starting from .NET 4.0 Microsoft has released Task
Parallel Library (TPL) that allows developers to define tasks and
write custom Task Schedulers to run them. This is a perfect place for
us to implement our real-time framework. We will define PeriodicTask
and SporadicTask
classes that will inherit from System.Threading.Task
class and we will implement our own PriorityTaskScheduler
that will
inherit from TaskScheduler
class. Since we can buffer the arriving
market data (with the help of Rx) and dispatch it to the GUI at
regular intervals we call these tasks periodic. Any user interaction
(clicking buttons etc.) cannot be predicted and so we call these
tasks sporadic. We will use Rate-monotonic algorithm as a base with
the only difference in the way we treat sporadic tasks. My requirement
is that “GUI needs to be responsive all the time” and that is why I
consider sporadic tasks a higher priority than any periodic task and
will run them first.
Complex Adaptive Systems
Although I have seen a lot of research materials on how complex
systems should adapt to different conditions, monitor resources,
self-validate and restore components I have not seen a lot of
implementations of this in the UI world. So I am going to build in
some of this functionality into the framework and hopefully that would
be just enough to meet my “never hang, freeze, or crash” requirement.
Since I am using composite or modular UI it is very easy to implement
some component monitoring functionality. The main shell establishes
connection with its modules and sends regular heartbeats to make sure
all parts of the system are responsive. Should any of the modules fail
to respond we will need to unload this screen from memory and where
possible reload with a new instance of it. I am going to run each view
as a new Window class so it can be assigned its own dispatcher. This
way if a particular view “hangs” the rest of the UI is still “live and
kicking”. Our main shell “never hangs or freezes” just because we are not
doing any work on it's dispatcher. Its job is to load system's
components, monitor the heartbeats and reload other modules/views. So
because each view has its own dispatcher we can process a lot more
updates without slowing doing the whole application.
Each view will also be assigned a MaintenanceEngineer
class whose job
is to monitor view's memory usage, send heartbeats and monitor the
dispatcher queue. It can make a decision whether we are falling behind
with periodic updates or user interactions (sporadic tasks).
Composite UI
When it comes to composite UI WPF developers are spoilt for choice.
For my purposes I narrowed my choice down to three frameworks namely Prism,
MeffedMVVM by Marlon Grech and Cinch by Sacha Barber. I was only
guided by the principle that using one source is plagiarism but more
than one is research work. That suits me because I am just going to
pinch some useful features for my real-time framework.
Prism
The best thing about Prism is ...eh ...well modularity. That is
achieved with the help of Bootstrapper
class that loads modules. I much
prefer the way Marlon does it with MEF so I think we ll pinch it from
his MeffedMVVM . Second best thing is … decoupling … achieved with
EventAggregator
class. Again I much prefer the way Sacha does it with
his Mediator
class which he lifted from Karl Shifflett's work who lifted it
from Marlon who lifted it from Josh Smith (but not necessarily in that order).
I am going to make some changes to it as well that we can discuss
later. There are also Region Adapters which I am not going to use
(because it is way over engineered and pretty useless in my opinion)
and Unity which I am not going to use either because I can use MEF to
instantiate my objects and I don't want to build long dependency
chains as Unity does (I have this idea that ViewModels
and Services
should be completely decoupled and only communicate via Mediator
where
they subscribe to topics and broadcast async messages to these topics
– sort of a GUI enterprise bus, bear with me perhaps I can explain it
better a bit later). So in fact from Prism we borrow ...er ..nothing
MeffedMVVM
In MeffedMVVM Marlon uses MEF to link Views
and ViewModels
. I am going
to use his idea but I will use MEF to resolve ViewModels
and Services
.
There is no need for them to reference each other because they both
have a reference to singleton Mediator
class that provides publish /
subscribe mechanics and chooses the TaskScheduler
to run the tasks.
There is also strong “one View – one ViewModel” connection that is
defined at the design time so no need to link them dynamically at the
run time. Marlon also defines “design time” behaviour for his views so
each control has some dummy data that can be presented in Blend. I am
going to use this idea but apply it to the Services rather then the
Views. So … each Service would have a MockService version of itself
and with a simple config change you should be able to run the GUI with
real or mock services. This allows you to run and work on the GUI when the
Server is down (as they do) or the Server side component is not ready (I don't think this is what Marlon meant by his “design time data”
but anyhow...)
So from MeffedMVVM we are going to borrow ...well MEF and MVVM
Cinch
Now this is where we are going to lift a lot of stuff from.So ... thank
you Sacha.
We start with WeakEventAction
, WeakAction
, EventToCommandTrigger
and slightly
modified Mediator
.
They are brilliant and there is no need for me to repeat how they
work since you can just refer to the original post on Cinch by Sacha
Barber.
MVVM and whatnot
As you have probably guessed I am going to use MVVM so there are some
SimpleCommand
and AttachedProperties
classes
lying around. It is probably impossible to trace the author of these
by now so I will attribute them to Sacha Barber anyway.
Another class worth mentioning is the Entity
. It is an old .NET trick to
bind data to the grid with custom PropertyDescriptor
and ITypedList
.
So all data is passed around as Entity
object, google “property bag
.NET” if you need more details on how it works.
I am also using Reactive Extensions (Rx) to throttle and conflate arriving data.
Design
Well … I have to say that the restaurant in this bank was absolutely
dire so I decided to crack on with my design.
So with all the usual MVVM suspects present - Views
, ViewModels
and
Services
- we also have the Shell
which just loads / unloads modules
and monitors heartbeats rate /resources, etc. And we have modules that
implement business specific logic. Each module has Views
, RibbonView
,
ViewModels
and ServiceObservers
. When the Shell
starts the
Bootstrapper
will discover the modules on the disk using MEF. It will
add RibbonViews
to the Shell's content, load Views
(each on its own
dispatcher) in separate Windows and create Services
. It will also
instantiate the Mediator
where ViewModels
and services would “register
their interest”. So what happens is Mediator
has a list of topics that
define all possible flows in the system. It will keep a list of
delegates to run if a message is being broadcast on a specific topic.
So ViewModels
and Services
all loaded in memory chatting away via the
Mediator
message bus,Mediator
picks a right TaskScheduler
to
guarantee real-time updates, ViewModels
update Views
and the Shell
is
talking to MaintenanceEngineers
to make sure everybody is playing
nicely.
Spoon? There is no spoon (xaml.cs)
I was debating (with myself) whether Bootstrapper
should resolve View
or ViewModel
which brings us to the old argument whether V should
reference the VM or VM should reference the view. To be honest I don't
think there is much of a different as long as you pick your pattern
and stick to it. All MVVM, MVC and MVP do is quite similar with some
subtle differences (refer to my previous MVVM post). Anyway the point
is that the main goal of MVVM is to allow “Blend-ability” and that to me means one thing only – no xaml.cs. There are a lot of good
frameworks where people would still write in the xaml.cs and so you
need to switch between VM and xaml.cs to find where things are which
results in a bit of a mess.
So I took “no xaml.cs” quite literally and .. removed xaml.cs. So if
you look at the project you will only find xaml files with no
corresponding xaml.cs. This is possible because the only thing that you
really need from .cs file is to call Initialise() and assign
DataContext which is what I do in the BaseViewModel
. So effectively
each ViewModel
creates its View
, Initialises its content and assigns
DataContext which makes sense to me since its VM's job to control the
View
After another “update meeting” my manager convinced me that I still
didn't see a bigger picture. So here is it – a bigger picture:
And 'cause I think drawing “bigger picture” diagrams is kids play – I
got my four year old and three year old to do it for me (well.. actually no I did it myself :( )
Picture 1 - Bigger Picture
So if we start from back to front the first point of data entry is
Services. All services have real and mock implementation classes that
can be discovered and loaded by MEF. In my case real services are
loaded in a separate process and communicate via WCF NamedPipes where
as mock services just referenced by the main application. All you need
to do is to change Solution Configuration in VS to build real or mock
version. This is achieved with a post build script that changes the
config file values after the build. This is a great way to
test/develop your application without connecting to a server.
Services receive the data and build key value pair data contracts.
These events are being “exploded” and observed by Rx. They will then be throttled and thrown at the GUI with regular, observed intervals
(this is the first part of our real-time constraint). Normally it would be
one-to-one connection between service and service observer but not
necessarily. One ServiceObserver
can observe more than one service and
use Rx Merge extension to present these events as if they came from
the same source (take a look at the BaseServiceObserver
class).
ServiceObserver
would conflate any data if a newer version arrived
before the previous version was dispatched. It would build the Entity
with an EntityBuilder
and use Mediator
to broadcast these Tasks
to
whoever is interested.
And so Entities
will enter the Mediator
that will find which ViewModel
has registered their interest and find the delegates that it needs to
run. Mediator
will create a task that encapsulates the delegate to run
and the newly arrived data for that delegate. Mediator
will pass that
task to appropriate TaskScheduler
. LongRunningTaskScheduler
will run
all tasks on a separate thread using processor affinity. So all
services process their data on just one processor leaving the other
processors to the PriorityTaskScheduler
. If you still remember what we
said about real-time systems being aware of the hardware they run on?
So in my case I know I will use four core machine and using a series of
tests I determined that this application best runs when all services
run on one processor and the other three are used by PriorityTaskScheduler
to run periodic and sporadic tasks. It's up to you to performance tune
your application and the hardware that you use. (Because TS is your
one point entry for all data flows in the system it is very easy to
log all the stats that you need there)
Picture 2 - Processor Affinity
PriorityTaskScheduler
will start three threads (one for each core) that
will be processing the incoming tasks. Tasks
will be queued in the
waiting queue and every defined period moved to the ready to run queue
(performing context switch if necessary). Now the delegates that we
will run are methods defined on ViewModels
that registered their
interest for the specific topic. It will notify the View that the
Entity
has been updated.
Traditionally it would have to call INotifyPropertyChanged
for that.
There is a problem with this approach. Any Windows old-timer knows
that “UI elements can only be updated on the thread that owns them”.
This is actually not true or shall we say not exactly true … It should
be rephrased like “GUI framework developers want you to update UI
elements on the thread that owns them”. And so WPF developers are not
different and for that reason they force us to call
INotifyPropertyChanged
from the property setter and do
Dispatcher.BeginInvoke
every time we need to update a property. This
is clearly not going to work for us since the Entity
are being updated
constantly in real-time on many different threads so imagine if we
called Dispatcher.BeginInvoke
all the time? The GUI would not be “very
responsive” at all. So instead we are going to let Entity
be updated on
multiple threads and move INotifyPropertyChanged
into a separate
method that can be called whenever we ready to notify the View
. I can
see that some of you are starting to think about a potential race
condition and that is true (to an extent) it is possible that the
value of the property we display is different from the current value
of the property (another thread came in and changed it) ….. However
since we guarantee to call INotifyPropertyChanged
within the
specified real-time constraint (lets say 50ms) we therefore guarantee
that the user will see the most recent data as it arrived to us with a maximum 50ms delay. The actual call to INPC is done within
NotifyCollection
(we look at it in more detail in a sec) so all a VM
developer needs to do is to call AddOrUpdate
method and Bob's your
uncle
Now remember what I said that generic design patterns don't always
work for real-time applications? This is certainly the case here
'cause in a pure WPF UI architecture the Entity
would have no
knowledge about UI specific elements i.e. Background Colour etc. But
since we don't care about patterns but care about real-time we can
save a lot of UI thread time by doing a lot of work in advance. That's
why Entity
knows about its cell content, colour, size and assigns them
on a background thread just to notify the UI thread later.
So the Entity
becomes that pulsing constantly updated cache of UI
bounded elements and GUI is guaranteed to be notified about the latest
state within real-time constraint.
This is how the data flows in WPF Real-Time framework. Events (come
from the server) that turn into Entities (conflated and throttled)
that turn into Tasks (guaranteed real-time execution).
Picture 3 - WPF Real-Time App
By allowing data flow freely and not worrying about the locks but
guarantee RT constraint instead we are able to achieve “very
responsive GUI” and “very fast updates”
This pretty much is the bigger picture. I just want to highlight some
of the main classes that are worth lifting for your future projects.
Here they are:
Key Classes
BlockingCircularBuffer
I lifted this one from C++ BOOST library CircularBuffer
class but use it as a BlockingCollection
from .NET. So it is in fact similar to BlockingCollection
. The only difference is that unlike BlockingCollection
it uses fixed size array which is more memory efficient.
...
public void Enqueue(T item)
{
lock (_lock)
{
while (IsFull() || _writingSuspended)
{
Monitor.Wait(_lock);
}
_buffer[_writerIndex] = item;
_writerIndex++;
_writerIndex %= _size;
Monitor.Pulse(_lock);
}
}
...
public T Dequeue()
{
T item;
lock (_lock)
{
while (IsEmpty())
{
Monitor.Wait(_lock);
}
item = _buffer[_readerIndex];
_readerIndex++;
_readerIndex %= _size;
Monitor.Pulse(_lock);
}
return item;
}
...
NotifyCollection
This class implements INotifyCollectionChanged
and so we can just throw OnCollectionChanged
with NotifyCollectionChangedEventArgs
in AddOrUpdate
to minimize the number of updates. This allows us to store the data in Dictionary
(so it is easier and quicker to search) and only throw OnCollectionChanged
without searching and updating the elements of the collection
public class NotifyCollection<T> : Collection<T>, ITypedList, INotifyCollectionChanged where T : SelfExplained
{
...
public void AddOrUpdate(IEnumerable<T> items, bool isReplace)
{
foreach (var item in items)
{
if (!Contains(item))
{
Add(item);
OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
}
else
{
if (isReplace)
{
OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Replace, item, item));
}
else
{
foreach (var property in item.GetPropertyNames())
{
item.OnPropertyChanged(property);
}
}
}
}
}
...
}
WpfGridColumn
This is where the binding magic happens
...
protected override FrameworkElement GenerateEditingElement(DataGridCell cell, object dataItem)
{
TextBox editElement = new TextBox {BorderThickness = new Thickness(0.0), Padding = new Thickness(0.0)};
System.Windows.Data.Binding textBinding = new System.Windows.Data.Binding(cell.Column.Header + ".DisplayValue")
{Source = dataItem};
editElement.SetBinding(TextBox.TextProperty, textBinding);
System.Windows.Data.Binding backgroundBinding = new System.Windows.Data.Binding(cell.Column.Header + ".Background")
{Source = dataItem};
editElement.SetBinding(TextBlock.BackgroundProperty, backgroundBinding);
System.Windows.Data.Binding foreGroundBinding = new System.Windows.Data.Binding(cell.Column.Header + ".Foreground")
{Source = dataItem};
editElement.SetBinding(TextBlock.ForegroundProperty, foreGroundBinding);
return editElement;
}
...
Mediator
Enough said about this class. You know what is does ... distributes Tasks to registered users ... and picks the right TaskScheduler
...
public bool Broadcast(string key, params object[] message)
{
List<WeakAction> wr;
lock (_registeredHandlers)
{
if (!_registeredHandlers.TryGetValue(key, out wr))
return false;
}
foreach (var cb in wr)
{
Delegate action = cb.GetMethod();
if (action == null) continue;
switch (cb.TaskType)
{
case TaskType.Background:
if (!_baseQueue.ContainsKey(cb.Target.Target))
{
var bgTask = _backgroundTaskFactory.StartNew(() => action.DynamicInvoke(message));
_baseQueue.Add(cb.Target.Target, bgTask);
}
break;
case TaskType.Periodic:
var periodicTask = new PeriodicTask(() => action.DynamicInvoke(message), _budget);
periodicTask.Start(_preemptiveScheduler);
break;
case TaskType.Sporadic:
var sporadicTask = new SporadicTask(() => action.DynamicInvoke(message), _budget);
sporadicTask.Start(_preemptiveScheduler);
break;
case TaskType.LongRunning:
_staTaskFactory.StartNew(() => action.DynamicInvoke(message));
break;
}
}
lock (_registeredHandlers)
{
wr.RemoveAll(wa => wa.HasBeenCollected);
}
return true;
}
...
PriorityTaskScheduler
Implements rate-monotonic algorithm. Waiting queue, ready to run queue ..didn't we talk about it already?
...
public PriorityTaskScheduler()
{
_tsp = Convert.ToInt32(ConfigurationManager.AppSettings["TASK_SCHEDULER_PERIOD"]);
_buferSize = Convert.ToInt32(ConfigurationManager.AppSettings["BUFFER_SIZE"]);
_threshold = Convert.ToDouble(ConfigurationManager.AppSettings["SUSPENSION_THRESHOLD"]);
_readyToRunQueue = new BlockingCircularBuffer<PeriodicTask>(_buferSize);
_waitingQueue = new ConcurrentQueue<Task>();
var executor = new System.Timers.Timer(_tsp);
executor.Elapsed += WaitingQueueToReadyToRun;
executor.Start();
for (int i = 0; i < Environment.ProcessorCount; i++)
{
_backgroundTaskFactory.StartNew(() =>
{
while (true)
{
var task = _readyToRunQueue.Dequeue();
#if DEBUG
_log.Debug("Message Dequeued");
#endif
if (TryExecuteTask(task))
{
var span = DateTime.UtcNow - task.TimeCreated;
if (DateTime.UtcNow > task.Deadline)
{
_log.Warn(String.Format("Real-time Deadline exceeded : {0}", span.TotalMilliseconds));
}
#if DEBUG
_log.Debug("Message Done");
#endif
}
}
});
}
}
private void WaitingQueueToReadyToRun(object sender, System.Timers.ElapsedEventArgs e)
{
Task task;
while (_waitingQueue.TryDequeue(out task))
{
PeriodicTask headTask;
if (task is PeriodicTask)
{
headTask = (PeriodicTask)task;
var nextToRun = _readyToRunQueue.Peek();
if ((nextToRun != null) && (nextToRun.Status == TaskStatus.WaitingToRun)
&& headTask.Deadline < nextToRun.Deadline)
{
_log.Info("Context switching at: " + DateTime.UtcNow);
var dequeuedTask = _readyToRunQueue.Dequeue();
_readyToRunQueue.Enqueue(headTask);
_readyToRunQueue.Enqueue(dequeuedTask);
}
_readyToRunQueue.Enqueue(headTask);
}
}
}
protected override void QueueTask(Task task)
{
#if DEBUG
_log.Debug("Message Enqueued");
#endif
if (task is SporadicTask)
{
TryExecuteTask(task);
_log.Info("Sporadic jumped the queue at: " + DateTime.UtcNow);
return;
}
_waitingQueue.Enqueue(task);
}
...
BaseServiceObserver
Uses Rx to conflate the data
...
public virtual void AddServicesToObserve(IEnumerable<IService> services)
{
_entityIndex = new ConcurrentDictionary<ulong, Entity>();
var observer = services.Select(s =>
Observable.FromEvent<EventArgs<DataRecord>>(h => s.DataReceived += h, h => s.DataReceived -= h))
.ToList()
.Merge();
observer.Select(x => x.EventArgs.Value)
.Where(x => x.DataRecordKey != null)
.BufferWithTime(TimeSpan.FromMilliseconds(_od))
.Where(x => x.Count > 0)
.Subscribe(DataReceived, LogError);
}
...
BaseViewModel
Resolves the View
and calls Initialise()
so we can get rid of xaml.cs
...
protected BaseViewModel(string view, bool vmResolvesView, bool signedMaintenanceContract)
{
Log = LogManager.GetLogger(view);
if (vmResolvesView)
{
var attr = (ViewAttribute)GetType().GetCustomAttributes(typeof(ViewAttribute), true).FirstOrDefault();
if (attr == null) throw new ApplicationException("ViewAttribute is missing");
ViewReference = (FrameworkElement)Activator.CreateInstance(attr.ViewType);
Uri resourceLocater = new Uri("/" + attr.ViewType.Module.ScopeName.Replace(".dll", "") + ";component/Views/" +
attr.ViewType.Name + ".xaml", UriKind.Relative);
Application.LoadComponent(ViewReference, resourceLocater);
ViewReference.DataContext = this;
}
if (ViewReference is Window)
{
((Window)ViewReference).Left = Convert.ToDouble(ViewReference.GetValue(WindowProperties.LeftProperty));
((Window)ViewReference).Top = Convert.ToDouble(ViewReference.GetValue(WindowProperties.TopProperty));
((Window)ViewReference).Width = Convert.ToDouble(ViewReference.GetValue(WindowProperties.WidthProperty));
((Window)ViewReference).Height = Convert.ToDouble(ViewReference.GetValue(WindowProperties.HeightProperty));
}
if (signedMaintenanceContract)
DispatcherFacade = new MaintenanceEngineer(view, Log, Dispatcher);
}
...
MaintenanceEngineer
Monitors the dispatcher queue and calls DoEvents extension if the queue is growing
...
private long _operationsQueueCount = 0;
private readonly ConcurrentDictionary<DispatcherOperation, object> _operations = new ConcurrentDictionary<DispatcherOperation, object>();
private void Heartbeat(long l)
{
var heartbeat = new Heartbeat(_dynamicViewName, String.Format("{0} View heartbeat sent at: {1}", _dynamicViewName, DateTime.UtcNow.ToLongTimeString()), DateTime.UtcNow, false);
Action w = () => Mediator.GetInstance.Broadcast(Topic.ShellStateUpdated, heartbeat);
AddToDispatcherQueue(w);
}
private void MonitorDispatcherQueue(long l)
{
if (_operationsQueueCount != 0)
_log.Info(String.Format("Dispatcher Operations In Queue {0}, ", _operationsQueueCount));
if (_operationsQueueCount > _qs)
{
_log.Info("Pushing all Dispatcher operations");
Application.Current.DoEvents();
_operations.Clear();
Interlocked.Exchange(ref _operationsQueueCount, 0);
}
}
#region IDispatcherFacade Members
public void AddToDispatcherQueue(Delegate workItem)
{
var operation = _dispatcher.BeginInvoke(DispatcherPriority.Background, workItem);
operation.Completed += (s, o) =>
{
Interlocked.Decrement(ref _operationsQueueCount);
object t;
_operations.TryRemove((DispatcherOperation)s, out t);
};
_operations.TryAdd(operation, null);
Interlocked.Increment(ref _operationsQueueCount);
}
#endregion
...
RealService
some WCF stuff
...
public RealService()
{
_channelFactory = new ChannelFactory<IRemoteSubscriptionService>(
new NetNamedPipeBinding(NetNamedPipeSecurityMode.None) { MaxReceivedMessageSize = 5000000, MaxBufferSize = 5000000 },
"net.pipe://WPFRealTime/SubscriptionService");
_proxy = _channelFactory.CreateChannel();
ServiceHost host = new ServiceHost(new ConnectionListener(this));
host.AddServiceEndpoint(typeof(IRemotePublishingService),
new NetNamedPipeBinding(NetNamedPipeSecurityMode.None) { MaxReceivedMessageSize = 5000000, MaxBufferSize = 5000000 },
"net.pipe://WPFRealTime/Bond/PublishingService");
host.Open();
}
private void SendData(DataRecord data)
{
EventHandler<EventArgs<DataRecord>> handler = DataReceived;
if (handler != null)
{
handler(this, new EventArgs<DataRecord>(data));
}
}
#region IService Members
[RegisterInterest(Topic.BondServiceGetData, TaskType.Background)]
public void GetData()
{
try
{
_proxy.GetData(new RequestRecord() { AssetType = AssetType });
}
catch (Exception ex)
{
throw new ApplicationException("WCF channel is closed", ex);
}
}
public event EventHandler<EventArgs<DataRecord>> DataReceived;
...
BondServiceObserver
Note we can change "GUI owned" Entities before passing them to the Dispatcher
[Export(typeof(BaseServiceObserver))]
public class BondServiceObserver : BaseServiceObserver
{
public BondServiceObserver()
{
AddEventExploder(EventExploder);
}
public override void AddServicesToObserve(IEnumerable<IService> services)
{
base.AddServicesToObserve(services.Where(s => s.AssetType == AssetType.Bond));
}
public void EventExploder(IEnumerable<Entity> messages)
{
RuleEngine.ApplyRules(messages);
Mediator.GetInstance.Broadcast(Topic.BondServiceDataReceived, messages);
}
}
BondViewModel
Sample View Model
...
public BondViewModel() : base("Bond Module", true, true)
{
DynamicViewName = "Bond Module";
_grid = GetRef<DataGrid>("MainGrid");
Entities = new NotifyCollection<Entity>(EntityBuilder.LoadMetadata(AssetType.Common, AssetType.Bond));
CreateColumnsCommand = new SimpleCommand<Object, EventToCommandArgs>((parameter) => true, CreateColumns);
}
[RegisterInterest(Topic.BondServiceDataReceived, TaskType.Periodic)]
private void DataReceived(IEnumerable<Entity> entities)
{
Action w = () => Entities.AddOrUpdate(entities, false);
DispatcherFacade.AddToDispatcherQueue(w);
}
[RegisterInterest(Topic.BondModuleHang, TaskType.Sporadic)]
private void Hang()
{
ProcessOnDispatcherThread(() => Thread.Sleep(10000));
}
[RegisterInterest(Topic.BondModuleOpen, TaskType.Sporadic)]
private void OpenClose(bool state)
{
ProcessOnDispatcherThread(() =>
{
if (state)
((Window)ViewReference).Show();
else
((Window)ViewReference).Hide();
});
}
...
Bootstrapper
Uses MEF to discover and load components
...
private readonly Action<Lazy<IDynamicViewModel>> _createView = ((t) =>
{
var vm = t.Value;
Mediator.GetInstance.Register(vm);
Window view = (Window)((BaseViewModel)vm).ViewReference;
RunningWindows.TryAdd(vm.DynamicViewName, view);
var heartbeat = new Heartbeat(vm.GetType().ToString(), String.Format("{0} View loaded at: {1}", vm.GetType().ToString(), DateTime.UtcNow.ToLongTimeString()), DateTime.UtcNow, true);
Mediator.GetInstance.Broadcast(Topic.ShellStateUpdated, heartbeat);
view.Closed += (sender, e) => view.Dispatcher.InvokeShutdown();
Dispatcher.Run();
});
private void InjectDynamicViewModels(bool multiDispatchers)
{
foreach (var lazy in DynamicViewModels)
{
Lazy<IDynamicViewModel> localLazy = lazy;
if (multiDispatchers)
{
Mediator.GetInstance.Broadcast(Topic.BootstrapperLoadViews, localLazy);
}
else
{
CreateView(localLazy);
}
}
}
private void InjectServices()
{
foreach (var lazy in Services)
{
var service = lazy.Value;
Mediator.GetInstance.Register(service);
var heartbeat = new Heartbeat(service.GetType().ToString(), String.Format("{0} Service loaded at: {1}", service.GetType(), DateTime.UtcNow.ToLongTimeString()), DateTime.UtcNow, true);
Mediator.GetInstance.Broadcast(Topic.ShellStateUpdated, heartbeat);
}
foreach (var lazy in ServiceObservers)
{
lazy.Value.AddServicesToObserve(Services.Select(s => s.Value));
}
}
private void InjectStaticViewModels(MainWindow mainWindow)
{
foreach (var vm in StaticViewModels)
{
Mediator.GetInstance.Register(vm);
mainWindow.RibbonRegion.Items.Add(new TabItem { Content = ((BaseViewModel)vm).ViewReference, Header = vm.StaticViewName });
}
}
...
Finale
My requirements were to build a “real-time low-latency
multi-threaded front-office trading application using WPF/MVVM” which "needs to be responsive all the time, display trading
data very fast, never hang, freeze, crash or run out of memory".
I have achieved that by implementing rate-monotonic scheduling algorithm using .NET Parallel Task Framework and so it is real-time.
I run GUI on more than one Dispatcher so it is very responsive.
I allowed data to flow freely and modify "GUI element" on non-UI threads so it updates GUI very fast.
I shamelessly steal components from fellow WPF developers namely Sacha Barber, Marlon Grech, Kent Boogaart
The only place where I think I failed is to provide a generic
mechanism to handle sporadic events. As it is it's up to developer to
mark an event as sporadic. What would be nice is to have a mechanism
in place where every user action will be automatically picked up
before it gets into WPF event routing and processed as a sporadic event.
I was looking to use System.Windows.Input.InputManager
for this but failed. So if anyone has any idea
on how to intercept any user input in WPF and process it as a sporadic
event in our TaskScheduler
I would like to hear from you.