Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / desktop / WPF

Experimenting with enterprise level bus messaging

4.98/5 (75 votes)
3 Sep 2010CPOL36 min read 439.5K   1.5K  
A look into messaging solutions using NServiceBus.

Table of Contents

Introduction

As some of you may be aware, I have just finished writing a glut of articles on V2 of my Cinch MVVM framework, and I declared to one and all that after I was done with that, I would be taking some time away from WPF/MVVM to write about things that have caught my eye over the past couple of months that I have been too busy to write about. This is the first such article, and I guess, in many ways, is a weird one in so far as I am not presenting anything that I have sweated blood and tears over as I did when working on Cinch. Rather, it is me looking at a few cool .NET goodies out there that are freely available and seeing what can be done with them.

For this article, I picked up NServiceBus which is a messaging service bus which utilises Microsoft Message Queue to allow communication between different processes.

NServiceBus boasts integration with quite a few different technologies such as:

  • Console apps
  • Web sites
  • WCF Services
  • Windows Forms
  • Windows Presentation Foundation apps
  • Silverlight apps

Sounded pretty cool, and just by some strange coincidence at work, we happen to have a pretty real requirement to talk to a whole plethora of other apps which we currently do using a Duplex WCF Service that makes use of MSMQBinding, where we are considering adding a lot of extra messaging, so I was intrigued to see what other sorts of messaging solutions between processes were out there and how easy they were to use.

As I say, for this article, I picked upon NServiceBus and set about creating a little demo app which is the main thrust of this article. By the time I finish this article, I hope I will have explained enough about my findings with NServiceBus, that you would be able to make a decision as to whether it would be a good fit for your own projects or not.

I think it is probably a good time to also just let you know a bit about the demo apps attached, and what I wanted to achieve. I chose to create a collaborative graph (again, using freely available .NET goodies GraphSharp) that two individual processes could contribute to, basically keeping each other up to date using NServiceBus. I will talk much more on this later, but for now, just know that the demo app has two processes which communicate with each other using NServiceBus, and where the goal is to keep a graph that each of the processes has a copy of, up to date using NServiceBus messages.

There are some prerequisites which are outlined below; please read them carefully before continuing with the rest of the article.

Prerequisites

In order to run the code associated with this article, you will need the following components installed:

  1. Visual Studio 2010 (as the demo app is a VS2010 solution)
  2. .NET 4.0 (as the demo app is .NET 4.0)
  3. MSMQ - Microsoft Message Queue (as NServiceBus is all about MSMQ)

Discussion About NServiceBus

Now, I can not claim to be an expert in NServiceBus, but what I can say is that I have studied the documentation carefully (what there is of it), and I have managed to get the demo apps working how I wanted them to work. That said, if there are any NServiceBus experts out there reading this that think I am way off base, apologies for that.

I should also point out that this article is by no means a comprehensive guide, and I have by no means used all the features of NServiceBus; far from it, there is plenty that I have not touched, such as load balancing, sagas, or sending messages across an actual network (as I do not have a network at home).

Typical Arrangements

From examining the samples that come with NServiceBus, it would appear that it supports the following types of communication:

TopologyDescription

Image 1

Full Duplex

Duplex messaging is supported; in fact, the demo app does this. But it seems to support that both parties must know about each other via configuration so they each know about each other's queues.

I initially thought it might be possible to have more than two fully duplex processes, but this did not seem to be possible from a configuration point of view. I think NServiceBus is really only meant to work in a duplex manner with two parties, those being a Server and a Client, where both parties know explicitly about each other, and only each other.

You can of course use Pub/Sub which allows broadcasting to many subscribers from one publisher and from a subscriber back to the publisher.

But for the demo app attached to this article, I needed full duplex; it just would have been nice to have more processes to prove it was not some slight of hand Remoting trick.

Image 2

Publisher/Subscriber

This simply allows a publisher to publish to many subscribers. As each subscriber knows about the publisher, it should be possible for a subscriber to send messages to the publisher.

There are some good examples available when you download NServiceBus which cover FullDuplex and PubSub.

Configuration

Later on in the Hosting section, I will talk about how much of the NServiceBus configuration can either be done in code or in actual configuration files. As the demo app uses the self hosting option, there is a lot less configuration information to deal with as a lot of it is done via code instead.

Even going down the self hosting route, there is still a minimum set of requirements that must be covered; here is an App.Config file from the demo app:

XML
<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="MsmqTransportConfig" 
             type="NServiceBus.Config.MsmqTransportConfig, NServiceBus.Core"/>
    <section name="UnicastBusConfig" 
             type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core"/>
    <section name="RijndaelEncryptionServiceConfig" 
             type="NServiceBus.Config.RijndaelEncryptionServiceConfig, NServiceBus.Core"/>
  </configSections>

  <!-- in order to configure remote endpoints use the format: "queue@machine" 
       input queue must be on the same machine as the process feeding off of it.
       error queue can (and often should) be on a different machine.
  -->

  <MsmqTransportConfig InputQueue="WpfPublisherBInputQueue" 
                       ErrorQueue="error" 
                       NumberOfWorkerThreads="1" 
                       MaxRetries="5"/>

  <UnicastBusConfig>
    <MessageEndpointMappings>
      <add Messages="MyMessages" 
           Endpoint="WpfPublisherAInputQueue" />
    </MessageEndpointMappings>
  </UnicastBusConfig>

  <RijndaelEncryptionServiceConfig 
    Key="gdDbqRpqdRbTs3mhdZh9qCaDaxJXl+e7"/>

  <runtime>
    <loadFromRemoteSources enabled="true"/>
  </runtime>

  <startup>
    <supportedRuntime version="v4.0" 
                      sku=".NETFramework,Version=v4.0"/>
  </startup>

</configuration>

Let's tackle this one section at a time.

MsmqTransportConfig

Is used to configure the MSMQ parameters; I think the parameter names speak for themselves.

UnicastBusConfig

Is used to configure the bus. This is the place where you need to tell the Bus what messages you will be allowing for the current process and what endpoint queue you will be using. Providing you have MSMQ installed NServiceBus create this queue for you.

RijndaelEncryptionServiceConfig

Configures the encryption service. And to be totally frank, I do not know what the key value here is, I stole this section from one of the NServiceBus samples.

Runtime

I stated at the beginning of this article, the demo app is .NET 4.0 / VS2010, and I could not seem to load any of the NServiceBus DLLs in VS2010 without the use of this section. This element specifies whether assemblies from remote sources should be granted full trust.

SupportedRuntime

This is not strictly needed for NServiceBus. But as I stated at the beginning of this article, the demo app is .NET4.0 / VS2010, so I had to add this in order for the .NET 4.0 framework to work with the older NServiceBus DLLs.

For more information on extra configuration, please read the NServiceBus documentation, particularly these pages:

API

The API is a little bit strange at first, as the author of NServiceBus has decided to use interfaces in a highly novel way that somehow creates a fluent interface description when chained together. In fact, if you download and examine some of the NServiceBus examples, it is quite typical to see things like this:

C#
class EndpointConfig : IConfigureThisEndpoint, AsA_Server, IWantCustomInitialization

Which does look a bit strange I think, but once you get into it, it's not that bad. I have outlined below some of the more common API interfaces and classes that you may need to use. I would encourage you to explore the API a bit further at your leisure.

As_A_Client

Sets MsmqTransport to be non-transactional, and it purges its queue of messages on startup. This means that it starts fresh every time, not remembering anything before a crash. Also, it processes messages using its own permissions, not those of the message sender.

As_A_Server

Sets MsmqTransport to be transactional and does not purge messages from its queue on startup. This makes it fault-tolerant. Also, it processes messages under the permissions of the message sender (called impersonation) which prevents elevation of privilege attacks

As_A_Publisher

Extends AsA_Server, and also indicates to the infrastructure that a storage for subscription requests is to be set up.

IConfigureThisEndpoint

In NServiceBus, this is an empty class that is simply used to dictate how you want the endpoint configured. It can be thought of as a marker class; here is a typical example:

C#
using NServiceBus;

namespace WpfPublisherB
{
    public class EndpointConfig : IConfigureThisEndpoint, AsA_Publisher {}
}

IWantCustomInitialization

In NServiceBus, this is an empty class that is simply used to dictate that you require custom initialisation, and from what I can tell, seems to only needed once per project that makes use of NServiceBus. It can be thought of as a marker class; here is a typical example, where we have a single method called Init() where we can do stuff:

C#
using NServiceBus;

namespace MyClient
{
    public class ClientInit : IWantCustomInitialization
    {
        public void Init()
        {
            //Do something custom here
        }
    }
}

IHandleMessages<T>

NServiceBus provides an interface called IHandleMessages<T> which you can use to create message handling classes. This interface has a single method called Handle(T message).

These message handling classes should replace the generic T with an actual message type, where the message would typically be stored in a separate DLL that has been configured in the UnicastBusConfig section of App.Config. See the Configuration section of this article for more information on that.

Here is a typical message handling class:

C#
public class AddEdgeRequestMessageHandler : IHandleMessages<AddEdgeRequestMessage>
{
    public void Handle(AddEdgeRequestMessage message)
    {
        //Do stuff with message here
    }
}

It is important to note that at no point in your user code is a new instance of one of these classes ever instantiated; that job is done by the NServiceBus framework. I can only assume this is done via some bootstrapping/Reflection when the process is run.

Messages

As I just mentioned, you typically place all your actual messages in a separate DLL, which you must also allow in your App.Config, see the Configuration section of this article for more information on that.

Here is what a typical message class looks like; you can see that all message classes must implement the marker interface IMessage:

C#
public class AddVertexRequestMessage : IMessage
{
    public WireEncryptedString ConnectedToVertex { get; set; }
    public bool IsMale { get; set; }
    public WireEncryptedString NewVertexName { get; set; }
}

This class is expected to be used in conjunction with a message handler class that we just saw above.

Sagas

I have not used Sagas in the demo app, but essentially, these are long running operations that may contain many messages. As such, you would expect them to be stateful between these messages.

Using NServiceBus, you can explicitly define the data used for this state by implementing the interface IContainSagaData - all public get/set properties will be persisted by default:

C#
public class MySagaData : IContainSagaData
{
    // the following properties are mandatory
    public virtual Guid Id { get; set; }
    public virtual string Originator { get; set; }
    public virtual string OriginalMessageId { get; set; }

    // all other properties you want persisted - remember to make them virtual
}

NServiceBus uses NHibernate to transparently store your saga data in a database. It can also automatically generate the database schema for storing these classes (through the use of Fluent NHibernate). You can, as always, swap out these technologies - just implement the IPersistSagas interface.

A typical Saga implementation may look something like this, where we can tell it is started by a certain message type and that also handles other types of messages:

C#
public class MySaga : Saga<MySagaData>,
    IAmStartedByMessages<Message1>,
    IHandleMessages<Message2>
{
    public override void ConfigureHowToFindSaga()
    {
        ConfigureMapping<Message2>(s => s.SomeID, m => m.SomeID);
    }

    public void Handle(Message1 message)
    {
        this.Data.SomeID = message.SomeID;

        RequestTimeout(TimeSpan.FromHours(1), "some state");

        // rest of the code to handle Message1
    }

    public override void Timeout(object state)
    {
        // some business action like:
        if (!Data.Message2Arrived)
            ReplyToOriginator(new TiredOfWaitingForMessage2());
    }

    public void Handle(Message2 message)
    {
        // code to handle Message2

        Data.Message2Arrived = true;

        ReplyToOriginator(new AlmostDoneMessage { SomeID = message.SomeID });
    }
}

You can see that this Saga is started by a type of Message1 and also handles types of Message2.

The RequestTimeout method on the base class tells NServiceBus to send a message to another endpoint which will durably keep time for us. You'll need to add an entry to your UnicastBusConfig telling NServiceBus where that endpoint is. There's a process that comes with NServiceBus called the Timeout Manager which provides a basic implementation of this functionality.

When time is up, the Timeout Manager sends a message back to the Saga causing its Timeout method to be called with the same state object originally passed.

Important: Don't assume that other messages haven't arrived in the meantime.

Endpoints

The configuration of the endpoint will vary on what you want to do, as I mentioned early; this can be thought of as a marker class that NServiceBus uses when it is running. I assume there must be some Reflection at play to enable this to work. Anyway, the important things to note here are that you must inherit from the NServiceBus IConfigureThisEndPoint interface and then work out how the endpoint needs to work. Here is an example:

C#
using NServiceBus;

namespace WpfPublisherB
{
    public class EndpointConfig : IConfigureThisEndpoint, AsA_Publisher {}
}

As I also mentioned in the API section, you need to choose between AsA_Client/AsA_Server/AsA_Publisher, dependant on your needs.

Hosting

In much the same way as WCF needs to be hosted, so does NServiceBus. There are various options for doing this; in fact, NServiceBus comes with a generic host executable called NServiceBus.Host.exe, which if you download NServiceBus, you will see in most of the samples where they start NServiceBus.Host.exe within the Debug settings of the NServiceBus project you are trying to run.

Image 3

But this is only one way; there are others; in fact, NServiceBus also allows you to host NServiceBus.Host.exe as a Windows Service, where you would do this using the command line arguments specified below:

Image 4

For both of these options, you will need to configure NServiceBus. The demo app doesn't use either of these approaches; it uses the one shown below.

There is one further thing you can do which is to go the self hosting route, which is fairly easy to do using the following fluent code, which can be used to configure and create the NServiceBus IBus, which is really expected to only be done once.

C#
Bus = NServiceBus.Configure.With()
    .DefaultBuilder()
    .XmlSerializer()
    .RijndaelEncryptionService()
    .MsmqTransport()
        .IsTransactional(false)
        .PurgeOnStartup(true)
    .UnicastBus()
        .ImpersonateSender(false)
    .LoadMessageHandlers() // need this to load MessageHandlers
    .CreateBus()
    .Start();

Now, some of this may look a bit weird, so I will try and explain all the different parts here, but I will also be covering some of it in separate sections which we will get to later.

  • DefaultBuilder(): Tells NServiceBus to use the default IOC container, which at the time of writing this article was Spring .NET (though I read somewhere this is going to change to AutFac soon).
  • XmlSerializer(): Tells NServiceBus to serialize objects using XML.
  • RijndaelEncryptionService(): Tell NServiceBus to use Rijndael encryption.
  • MsmqTransport(): Sets up NServiceBus MSMQ options.
  • MsmqSubscriptionStorage(): Uses the MSMQ storage, rather than a database.
  • UnicastBus(): Sets up the NServiceBus bus.
  • LoadMessageHandlers(): Instructs NServiceBus to dynamically load all the message handler classes it can find (from what I can tell, this is done via some Reflection).
  • CreateBus(): Creates the IBus.
  • Start(): Starts the IBus.

You can read more about this at the NServiceBus website page: http://www.nservicebus.com/GenericHost.aspx.

IOC

NServiceBus uses an IOC internally. In fact, NServiceBus is able to work with a variety of IOC containers. At the time of writing this article, NServiceBus was v2.0 and used Spring .NET as its default IOC container.

It is able to use the following containers:

  • Autofac
  • StructureMap
  • Castle
  • Unity
  • Spring .NET (default with NServiceBus 2.0)

In fact, here is an example of the IBus self hosting configuration for Castle WindsorContainer, where we add in another service type that could then be resolved in the message handlers that you will see later.

C#
var castleContainer = new WindsorContainer();
castleContainer.AddComponent<IStringStorer, StringStorer>();

NServiceBus.Configure.With().CastleWindsorBuilder(castleContainer)
    .RijndaelEncryptionService()
        .XmlSerializer()
        .MsmqTransport()
            .IsTransactional(true)
                .PurgeOnStartup(true)
        .MsmqSubscriptionStorage()
        .UnicastBus()
        .CreateBus()
        .Start();

As you can see, I am creating a new IStringStorer and adding it to the WindsorContainer, which means that in any message handler class, you can do something like this:

C#
using System;
using MyMessages;
using NServiceBus;

namespace MyClient
{
    class DataResponseMessageHandler : IHandleMessages<DataResponseMessage>
    {

        public IStringStorer Storer { get; set; }

        public void Handle(DataResponseMessage message)
        {
            //Do something with Storer
        }
    }
}

Where the IStringStorer will get resolved using the WindsorContainer by NServiceBus.

If you do not like any of the standard IOC containers, you can write your own too; for more information on this, check out the NServiceBus IOC documentation link: http://www.nservicebus.com/Containers.aspx.

What Does the Demo App Look Like

I think the best way to show what the demo app looks like is by looking at a small video, so please click on the image below; note: there is no audio.

Image 5

Please click on the image above to watch the video

  • Best watched in full-screen, which you can choose once you are on the video page, bottom right

The things to note about this video are that one process WpfPublisherB is started off first, and adds various graph vertices/edges, and then WpfPublisherA is started and deals with the queued up messages that have been routed to it while is has not been running. Once both WpfPublisherA and WpfPublisherB are running, adding a Vertex/Edge to either of them will cause the other one to also do the same operation. This is, of course, done with messaging using NServiceBus.

What Does the Demo App Do

The demo app is quite a simple thing really. It can be outlined in the following bullet points:

  • There are two identical WPF projects called WpfPublisherA/WpfPublisherB, both of which use a ViewModel called GraphLayoutViewModel, which sits in a common DLL called WpfCommon.
  • GraphLayoutViewModel provides a ViewModel that can be used to populate Vertices/Edges on a freely available WPF graphing library called GraphSharp.
  • GraphLayoutViewModel provides ICommand(s) to show two popup windows to allow the user to either add a new Vertex to the GraphSharp graph, or to create a new Edge within the GraphSharp graph. It should be noted that although the two WPF projects WpfPublisherA/WpfPublisherB use GraphLayoutViewModel, it is by no means shared state at all. They are two entirely different processes with separate AppDomains, that simply happen to use the WpfCommon DLL to construct their own GraphLayoutViewModel instance from. What actually happens is that whenever a new GraphSharp graph Vertex or Edge is added in one of these two processes, NServiceBus is used to communicate that change to the other process. So it can be thought of as a sort of synchronized graph. I also attempt to synchronize non-handled messages that a process may have received while it has not been running; basically, NServiceBus would have kept on sending the messages to the processes queue even while it is not running, up to a maximum number of retries (see App.Config), provided you do not tell NServiceBus to purge messages on startup (for the demo app, no purging of messages happens). Basically, what happens in the demo app happens in this order:
    1. The process is started.
    2. The NServiceBus message handler classes are invoked by NServiceBus, at which point all queued outstanding messages sent to this process are handled, in one of two ways:
      1. If the process is deemed to be online, then messages are attempted to be routed to GraphLayoutViewModel using the Cinch Mediator (more on this later).
      2. If the process is deemed to be offline, then all queued unhandled messages are added to the App (Application class), until the GraphLayoutViewModel is instantiated, at which point the App (Application class) passes all queued unhandled messages to the newly instantiated GraphLayoutViewModel, which goes through all these queued unhandled messages, adding the requested Vertices/Edges to the GraphSharp graph. After that is done, GraphLayoutViewModel sets a flag on the App (Application class) to tell it the process is now online, which tells any future NServiceBus incoming message handler calls to work as shown in step 1 just above this step.
    3. If GraphLayoutViewModel is not instantiated, create it, and hook up some Cinch Mediator message handlers, and accept the queued unhandled messages from the app (Application class), and loop through them as just described.
    4. Go to 2.
  • As it is WPF, I have, of course, used my own MVVM framework Cinch. This is a side detail, and is in no way the main thrust of this article, though I freely admit it has made it a lot easier to work with NServiceBus as I discuss below.

In a nutshell, that is what the demo app is trying to do. When you open the attached solution code, it will look like this:

Image 6

How Does it All Work

So we have covered a lot of NServiceBus ground I think, but now it is time to go through how the actual demo app works; watching the video should have given you some appreciation of how it all works, and I have just shown the overall solution structure, so all that is left is to go through those areas in a bit more detail, so these areas are shown below.

Common Parts

As I previously stated, there is a common DLL that the two other publisher processes use; this common DLL is called WpfCommon, and simply provides common classes that get used by the two other publisher processes. As this is essentially a bunch of MVVM WPF Windows/ViewModels and popups, it seemed logical to use my Cinch MVVM framework, though I will not be laboring on this; I just used it to make my life easier, so if you are not familiar with Cinch, try reading some of the articles on that.

The common elements are described below:

Helpers

There are a number of helpers which I will just quickly mention, but some of them we will cover in more details later on.

NativeCalls: Simply allows a drop shadow to be applied to a chrome-less window by using the DWM API calls. Thanks to Jerimiah Morrill for that one

INonHandledMessages: Is a simple interface that the two publisher projects App (Application class) implement in order to allow non-handled messages to be queued up in a globally available place.

CrossThreadTestRunner: This one is a pretty important class actually; as NServiceBus runs in an MTA (MultiThreadedAppartment state) and WPF runs in an STA (SingleThreadedApartment), we need some way of running the final NServiceBus message handling code in the GraphLayoutViewModel (that does the adding of the Vertices/Edges) in an STA manner. That is where this class is useful; it basically takes a workload delegate, starts a new thread to do this work, and runs it in an STA apartment. Here is the entire code for this class; it is very handy and one that I use from time to time.

C#
using System;
using System.Reflection;
using System.Security.Permissions;
using System.Threading;

namespace WpfCommon
{
    public class CrossThreadTestRunner
    {
        private Exception lastException;

        public void RunInMTA(ThreadStart userDelegate)
        {
            Run(userDelegate, ApartmentState.MTA);
        }

        public void RunInSTA(ThreadStart userDelegate)
        {
            Run(userDelegate, ApartmentState.STA);
        }

        private void Run(ThreadStart userDelegate, ApartmentState apartmentState)
        {
            lastException = null;

            Thread thread = new Thread(
              delegate()
              {
                  try
                  {
                      userDelegate.Invoke();
                  }
                  catch (Exception e)
                  {
                      lastException = e;
                  }
              });
            thread.SetApartmentState(apartmentState);

            thread.Start();
            thread.Join();

            if (ExceptionWasThrown())
                ThrowExceptionPreservingStack(lastException);
        }

        private bool ExceptionWasThrown()
        {
            return lastException != null;
        }

        [ReflectionPermission(SecurityAction.Demand)]
        private static void ThrowExceptionPreservingStack(Exception exception)
        {
            FieldInfo remoteStackTraceString = typeof(Exception).GetField(
              "_remoteStackTraceString",
              BindingFlags.Instance | BindingFlags.NonPublic);
            remoteStackTraceString.SetValue(exception, 
                    exception.StackTrace + Environment.NewLine);
            throw exception;
        }
    }
}

I will talk more about this class when I talk about GraphLayoutViewModel.

Popups

There are two popup windows and their associated ViewModels, but I do not really want to spend too much time on these as all these popups really do is allow operations in the main GraphLayoutViewModel to work. Just for clarity, this is what the two popups look like. If this interests you, just check out the code for them, but they really are not that interesting.

AddNewEdgePopupWindow

Image 7

AddNewVertexPopupWindow

Image 8

Views

There is really only one view called GraphLayoutView, which is the one that holds the GraphSharp graph; here is all the relevant markup for the XAML part of this class:

XML
<UserControl x:Class="WpfCommon.GraphLayoutView"
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
    xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" 
    xmlns:d="http://schemas.microsoft.com/expression/blend/2008" 
    xmlns:graphsharp="clr-namespace:GraphSharp.Controls;assembly=GraphSharp.Controls"
    xmlns:local="clr-namespace:WpfCommon"
    xmlns:zoom="clr-namespace:WPFExtensions.Controls;assembly=WPFExtensions"
    mc:Ignorable="d" >

    <Grid>
        <Grid.Resources>
            <DataTemplate x:Key="demoTemplate" 
                      DataType="{x:Type local:PocVertex}">
                <StackPanel Orientation="Horizontal" Margin="5">
                    <Image x:Name="img" Source="../Images/boy.ico" 
                            Width="20" Height="20" />
                    <TextBlock Text="{Binding Path=ID, Mode=OneWay}" 
                              Foreground="White" />
                </StackPanel>
                <DataTemplate.Triggers>
                    <DataTrigger Binding="{Binding IsMale}" Value="false">
                        <Setter TargetName="img" Property="Source"
                                Value="../Images/girl.ico" />
                    </DataTrigger>
                </DataTemplate.Triggers>
            </DataTemplate>

            <Style TargetType="{x:Type graphsharp:VertexControl}">
                <Setter Property="Template">
                    <Setter.Value>
                        <ControlTemplate 
                               TargetType="{x:Type graphsharp:VertexControl}">
                            <Border BorderBrush="White" 
                                    Background="Black"
                    BorderThickness="2"
                    CornerRadius="10,10,10,10"
                    Padding="{TemplateBinding Padding}">
                                <ContentPresenter Content="{TemplateBinding Vertex}" 
                                    ContentTemplate="{StaticResource demoTemplate}"/>
                                
                                 <Border.Effect>
                                    <DropShadowEffect BlurRadius="2" Color="LightGray" 
                                                      Opacity="0.3" Direction="315"/>
                                </Border.Effect>
                            </Border>
                        </ControlTemplate>
                    </Setter.Value>
                </Setter>
            </Style>

            <Style TargetType="{x:Type graphsharp:EdgeControl}">
                <Style.Resources>
                    <ToolTip x:Key="ToolTipContent">
                        <StackPanel>
                            <TextBlock FontWeight="Bold" Text="Edge.ID"/>
                            <TextBlock Text="{Binding ID}"/>
                        </StackPanel>
                    </ToolTip>
                </Style.Resources>
                <Setter Property="ToolTip" 
                         Value="{StaticResource ToolTipContent}"/>
            </Style>

        </Grid.Resources>

        <Grid>
        .....
            .....
            .....
            .....
            .....
            .....

            <zoom:ZoomControl  Grid.Row="1" 
                    Zoom="0.2" ZoomBoxOpacity="0.5">
                <zoom:ZoomControl.Background>
                    <LinearGradientBrush EndPoint="0.5,1" StartPoint="0.5,0">
                        <GradientStop Color="Black" Offset="0"/>
                        <GradientStop Color="#FF3F3F3F" Offset="1"/>
                    </LinearGradientBrush>
                </zoom:ZoomControl.Background>

                <local:PocGraphLayout x:Name="graphLayout" Margin="10"
                Graph="{Binding Path=Graph}"
                LayoutAlgorithmType="{Binding Path=LayoutAlgorithmType, Mode=OneWay}"
                OverlapRemovalAlgorithmType="FSA"
                HighlightAlgorithmType="Simple" />
            </zoom:ZoomControl>
        </Grid>
    </Grid>
</UserControl>

And here is the code-behind. It can be seen that it implements a IGraphFunctions interface which the GraphLayoutViewModel uses (via the Cinch IViewAwareStatus UI service) to carry out cross threading operations that need dispatching to the UI thread. NServiceBus is not the same thread as the UI one, so needs marshalling. This is definitely my preferred pattern, as you can create a Mock IGraphFunctions which can be used inside your tests, and in my opinion, a ViewModel should never use a Dispatcher directly; that responsibility belongs to the View.

C#
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;
using System.Windows.Threading;

using Cinch;

namespace WpfCommon
{
    /// <summary>
    /// Interaction logic for GraphLayoutView.xaml
    /// </summary>
    public partial class GraphLayoutView : UserControl, IGraphFunctions
    {
        public GraphLayoutView()
        {
            InitializeComponent();
            Mediator.Instance.Register(this);
        }


        /// <summary>
        /// These methods will be called by the
        ///    <c>GraphLayoutViewModel</c> by using the Cinch
        /// IViewAwareStatus UI service in the ViewModel
        /// </summary>
        #region IGraphFunctions Members

        public void LayoutGraph()
        {
            //As NServiceBus calls may come from 
            //a different thread, may need to marshall
            //to UI thread. We do get some call
            //(internal ones) not from NServiceBus which may
            //not need marshalling, so only Marshall those 
            //that need it using Cinch.Dispatcher extensions
            Dispatcher.InvokeIfRequired(() =>
            {
                try
                {
                    graphLayout.Relayout();
                }
                catch
                {
                }
            }, DispatcherPriority.Send);
        }


        public void AddNewVertex(PocVertex newVertex)
        {
            //As Graph is bound to UI, we need 
            //to marshall calls to update it, on Views Dispatcher
            Dispatcher.Invoke((Action)(() =>
            {
                try
                {
                    graphLayout.Graph.AddVertex(newVertex);
                }
                catch
                {
                }
            }));

        }

        public void AddNewEdge(PocEdge newEdge)
        {
            //As Graph is bound to UI, we need 
            //to marshall calls to update it, on Views Dispatcher
            Dispatcher.Invoke((Action)(() =>
            {
                try
                {
                    graphLayout.Graph.AddEdge(newEdge);
                }
                catch
                {
                }
            }));
        }

        #endregion
    }
}
ViewModels

As I stated in numerous parts of this article, there is a ViewModel called GraphLayoutViewModel which deals with the actual creation of the graph, and also deals with adding vertices/edges to the graph. Most of this code is fairly straightforward and not really that interesting. There are, however, a couple of areas of interest (basically, the Command handlers and the Mediator Message Sinks), which are as follows:

Command Handler: Adding a New Vertex Locally and BroadCasting that Change Via NServiceBus

This code firstly adds a new Graph Vertex to the local processes GraphLayoutViewModel graph. The interesting thing is what happens after the new Vertex is added to the local processes graph. Basically, the NServiceBus Bus is used to send a AddVertexRequestMessage message to the other process where it will be handled via a NServiceBus message handler class which I will discuss in the The Publisher section. Here is the relevant code from GraphLayoutViewModel:

C#
private void ExecuteAddNodeCommand(Object args)
{
    AddNewVertexPopupWindowViewModel addNewVertexPopupWindowViewModel=
        new AddNewVertexPopupWindowViewModel(messageBoxService, this.Vertices);
    bool? result = uiVisualizerService.ShowDialog("AddNewVertexPopupWindow", 
        addNewVertexPopupWindowViewModel);

    if (result.HasValue && result.Value)
    {
        PocVertex newVertex = 
          new PocVertex(addNewVertexPopupWindowViewModel.NewVertexName.DataValue, 
          addNewVertexPopupWindowViewModel.IsMale);
        graph.AddVertex(newVertex);

        string edgeid = string.Format("{0}-{1}",
            addNewVertexPopupWindowViewModel.ConnectedToVertex.DataValue.ID,
            addNewVertexPopupWindowViewModel.NewVertexName.DataValue);
        PocEdge newEdge = new PocEdge(edgeid,
            addNewVertexPopupWindowViewModel.ConnectedToVertex.DataValue, newVertex);
        graph.AddEdge(newEdge);
        ((IGraphFunctions)viewAwareStatusService.View).LayoutGraph();

        //now tell other processes to Add new Node using NServiceBus
        Bus.Send<AddVertexRequestMessage>(m =>
        {
            m.ConnectedToVertex = 
               addNewVertexPopupWindowViewModel.ConnectedToVertex.DataValue.ID;
            m.IsMale = addNewVertexPopupWindowViewModel.IsMale;
            m.NewVertexName = addNewVertexPopupWindowViewModel.NewVertexName.DataValue;
        });

    }
}
Command Handler: Adding a New Edge Locally and Broadcasting that Change Via NServiceBus

This code firstly adds a new Graph edge to the local processes GraphLayoutViewModel graph. The interesting thing is what happens after the new Edge is added to the local processes graph. Basically, the NServiceBus Bus is used to send a AddEdgeRequestMessage message to the other process where it will be handled via a NServiceBus message handler class which I will discuss in the The Publisher section. Here is the relevant code from GraphLayoutViewModel:

C#
private void ExecuteAddEdgeCommand(Object args)
{
    AddNewEdgePopupWindowViewModel addNewEdgePopupWindowViewModel =
        new AddNewEdgePopupWindowViewModel(messageBoxService, this.Vertices);
    bool? result = uiVisualizerService.ShowDialog("AddNewEdgePopupWindow",
        addNewEdgePopupWindowViewModel);
    
    if (result.HasValue && result.Value)
    {
        PocVertex vertex1 = 
          addNewEdgePopupWindowViewModel.ConnectedToVertex1.DataValue;
        PocVertex vertex2 = 
          addNewEdgePopupWindowViewModel.ConnectedToVertex2.DataValue;
        string edgeid = string.Format("{0}-{1}", vertex1.ID,vertex2.ID);
        PocEdge newEdge = new PocEdge(edgeid, vertex1, vertex2);
        graph.AddEdge(newEdge);
        ((IGraphFunctions)viewAwareStatusService.View).LayoutGraph();
    
        //now tell other processes to Add new Node using NServiceBus
        Bus.Send<AddEdgeRequestMessage>(m =>
        {
            m.ConnectedFromVertex = vertex1.ID;
            m.ConnectedToVertex = vertex2.ID;
        });
    
    
        Bus.Send<AddEdgeRequestMessage>(m =>
        {
            m.ConnectedFromVertex = vertex1.ID;
            m.ConnectedToVertex = vertex2.ID;
        });
    }
}

Mediator Message Sink: Responding to a NServiceBus Message When a New Vertex/Edge is Added

You may recall from earlier on in this article, I mentioned that the NServiceBus message handler classes are strange beasts that you don't really control the lifecycle of. They just spring into life when you use NServiceBus at the opportune moment. Which is a bit weird, and the other thing that I personally find a bit weird is that these NServiceBus message handler classes don't really integrate that well with the rest of your app; they are fine if you simply want to persist something to the database right there in the message handler, but I did not want that. I wanted the rest of my code to know about it when a new message came in. Mmmm, thinking hat on, so it sounds like not only do we need an enterprise level bus a.k.a. NServiceBus, we need some sort of internal messaging to broadcast these NServiceBus messages to the rest of my app. Sounds a lot like the Mediator Pattern to me. Even more, my Cinch MVVM framework has one of those.

So guess what, the demo app's NServiceBus message handlers send internal Cinch Mediator messages, which the GraphLayoutViewModel has Mediator message sinks hooked up to listen for. One for listening for Vertices being added, and one for listening for Edges being added; these two Mediator message handler sinks are shown below.

I will be showing how these Mediator messages are generated in The Publisher section of this article; for now, just know that the messages are broadcast from one process, then handled in the other process via NServiceBus message handlers, and relayed to GraphLayoutViewModel (if the app is deemed to be online, (basically, has the via GraphLayoutViewModel been instantiated yet?)) via the use of my Cinch MVVM framework Mediator. If the process receiving the NServiceBus messages is not deemed to be online, the messages are queued up in the App (Application class) of the process until it instantiates the GraphLayoutViewModel, at which point the App hands in the non-handled messages, but more on this later. For now, just look at the normal behvaiour the two demo processes would exhibit if they were running, which is to process the Cinch MVVM framework Mediator messages in response to incoming NServiceBus messages.

C#
[MediatorMessageSink("AddVertexRequestMessage")]
public void OnAddVertexRequestMessage(AddVertexRequestMessage message)
{
    //NServicebus is MTA (MultiThreadingAppartment),
    //so we need to make sure we run our code that
    //affects UI in STA (SingleThreadingAppartment
    CrossThreadTestRunner runner = new CrossThreadTestRunner();
    AutoResetEvent ar = new AutoResetEvent(false);
    runner.RunInSTA(()=>
        {
            PocVertex existingVertex = 
              graph.Vertices.Where(v => v.ID == 
                     message.ConnectedToVertex.Value).First();
            PocVertex newVertex = 
              new PocVertex(message.NewVertexName, message.IsMale);

            //As Graph is bound to UI, we need 
            //to marshall calls to update it, on Views Dispatcher
            //do this using IGraphFunctions which View implements
            ((IGraphFunctions)viewAwareStatusService.View).AddNewVertex(newVertex);

            PocEdge newEdge = new PocEdge(string.Format("{0}-{1}", 
                                          existingVertex.ID, newVertex.ID),
                existingVertex, newVertex);

            //As Graph is bound to UI, we need to marshall
            //calls to update it, on Views Dispatcher
            //do this using IGraphFunctions which View implements
            ((IGraphFunctions)viewAwareStatusService.View).AddNewEdge(newEdge);

            ar.Set();
        });

    //Wait for STAThread operation to complete
    ar.WaitOne();

    //ask view to use Dispatcher to marshall stuff to UI thread
    ((IGraphFunctions)viewAwareStatusService.View).LayoutGraph();
}

[MediatorMessageSink("AddEdgeRequestMessage")]
public void OnAddEdgeRequestMessage(AddEdgeRequestMessage message)
{

    //NServicebus is MTA (MultiThreadingAppartment),
    //so we need to make sure we run our code that
    //affects UI in STA (SingleThreadingAppartment
    CrossThreadTestRunner runner = new CrossThreadTestRunner();
    AutoResetEvent ar = new AutoResetEvent(false);
    runner.RunInSTA(() =>
    {
        PocVertex existingFromVertex = 
          graph.Vertices.Where(v => v.ID == 
              message.ConnectedFromVertex.Value).First();
        PocVertex existingToVertex = 
          graph.Vertices.Where(v => v.ID == 
              message.ConnectedToVertex.Value).First();

        PocEdge newEdge = new PocEdge(string.Format("{0}-{1}", 
                              existingFromVertex.ID, existingToVertex.ID),
            existingFromVertex, existingToVertex);

        //As Graph is bound to UI, we need
        //to marshall calls to update it, on Views Dispatcher
        //do this using IGraphFunctions which View implements
        ((IGraphFunctions)viewAwareStatusService.View).AddNewEdge(newEdge);

        ar.Set();
    });

    //Wait for STAThread operation to complete
    ar.WaitOne();

    //ask view to use Dispatcher to marshall stuff to UI thread
    ((IGraphFunctions)viewAwareStatusService.View).LayoutGraph();
}

There are a couple of things to note in there, which are things I have already mentioned such as NServiceBus, being MTA (MultiThreadedAppartmentState thread), which we can deal with when using WPF. So we use the handy class CrossThreadTestRunner to create a new STA thread that does the work. But since we are creating a new thread, we need to wait for it to finish, which is easy to do; we just use a Thread.AutoResetEvent to do that. Then, after that, we are hit with the problem of not being on the UI thread; yes, NServiceBus is multi-threaded (that is configured in App.Config), so to deal with that, we need to use the views Dispatcher, which we can do using the View, via the IGraphFunctions interface that it implements, which if you recall looked like this for the GraphLayoutView.

C#
public partial class GraphLayoutView : UserControl, IGraphFunctions
{
    public GraphLayoutView()
    {
        InitializeComponent();
        Mediator.Instance.Register(this);
    }
    
    /// <summary>
    /// These methods will be called by the
    ///   <c>GraphLayoutViewModel</c> by using the Cinch
    /// IViewAwareStatus UI service in the ViewModel
    /// </summary>
    #region IGraphFunctions Members
    
    public void LayoutGraph()
    {
        //As NServiceBus calls may come from
        //a different thread, may need to marshall
        //to UI thread. We do get some call (internal ones)
        //not from NServiceBus which may
        //not need marshalling, so only Marshall those
        //that need it using Cinch.Dispatcher extensions
        Dispatcher.InvokeIfRequired(() =>
        {
            try
            {
                graphLayout.Relayout();
            }
            catch
            {
            }
        }, DispatcherPriority.Send);
    }
    
    
    public void AddNewVertex(PocVertex newVertex)
    {
        //As Graph is bound to UI, we need to marshall
        //calls to update it, on Views Dispatcher
        Dispatcher.Invoke((Action)(() =>
        {
            try
            {
                graphLayout.Graph.AddVertex(newVertex);
            }
            catch
            {
            }
        }));
    
    }
    
    public void AddNewEdge(PocEdge newEdge)
    {
        //As Graph is bound to UI, we need to marshall
        //calls to update it, on Views Dispatcher
        Dispatcher.Invoke((Action)(() =>
        {
            try
            {
                graphLayout.Graph.AddEdge(newEdge);
            }
            catch
            {
            }
        }));
    }


    #endregion
}

As I have said before, I prefer to use the view for Dispatcher operations; as for me, this is the most logical place for it; the ViewModel shouldn't really care about what thread a control was created on, in my opinion.

The Publisher

As I have been saying all through the article, there are two practically identical WPF processes called WpfPublisherA and WpfPublisherB, which are really nearly identical. The only real difference between them is the App.Config where the queues that NServiceBus uses are different, and the actual names of the processes. As such I will only be explaining one of these processes, and you should be fine with the other one.

Project Structure

The overall structure of one of these WPF publisher projects looks like this, which I think is pretty self-describing:

Image 9

Configuration

In terms of unique configuration per publisher process, there is really only the App.Config, which looks like this:

XML
<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="MsmqTransportConfig" 
          type="NServiceBus.Config.MsmqTransportConfig, NServiceBus.Core"/>
    <section name="UnicastBusConfig" 
          type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core"/>
    <section name="RijndaelEncryptionServiceConfig" 
          type="NServiceBus.Config.RijndaelEncryptionServiceConfig, NServiceBus.Core"/>
  </configSections>

  <!-- in order to configure remote endpoints use the format: "queue@machine" 
       input queue must be on the same machine as the process feeding off of it.
       error queue can (and often should) be on a different machine.
  -->

  <MsmqTransportConfig InputQueue="WpfPublisherBInputQueue" 
                       ErrorQueue="error" 
                       NumberOfWorkerThreads="1" 
                       MaxRetries="5"/>

  <UnicastBusConfig>
    <MessageEndpointMappings>
      <add Messages="MyMessages" 
           Endpoint="WpfPublisherAInputQueue" />
    </MessageEndpointMappings>
  </UnicastBusConfig>

  <RijndaelEncryptionServiceConfig 
    Key="gdDbqRpqdRbTs3mhdZh9qCaDaxJXl+e7"/>

  <runtime>
    <loadFromRemoteSources enabled="true"/>
  </runtime>

  <startup>
    <supportedRuntime version="v4.0" 
                      sku=".NETFramework,Version=v4.0"/>
  </startup>

</configuration>

But there is also some configuration for the entire processes endpoint, which is done by creating a EndpointConfig.cs file which looks like this (this is almost the same for both WpfPublisherA and WpfPublisherB with the exception that WpfPublisherA is configured for AsA_Server whilst WpfPublisherB is configured for AsA_Client). I initially had these both set as AsA_Publisher, but the author of NServiceBus saw this article and corrected me, so I amended the article to what he said; after all, he should know, it's his framework after all.

C#
using NServiceBus;

namespace WpfPublisherB
{
    public class EndpointConfig : IConfigureThisEndpoint, AsA_Server  {}
}

Application Class

You may recall earlier when we were discussing how the GraphLayoutViewModel could be sent unhandled messages that had been sent to the process while it was not running, and these came from the App (Application class); well, this is how that mechanism works.

The message handlers will check an "Online" flag that is only set to true when the GraphLayoutViewModel is instantiated, and if the "Online" flag is not found to be true, all messages are routed to two queues in the App (Application class), at which point, when the GraphLayoutViewModel is instantiated, these unhandled messages are pushed at it via an interface of my choosing, so it can loop through them and display the relevant new Vertices/Edges that the unhandled NServiceBus represents.

Here is the code from the App (Application class) in its entirety, which also shows how NServiceBus is being self hosted, and it also shows the Cinch bootstrapper being called (as I say, if you are not familiar with Cinch, please refer to the numerous articles I have written on it):

C#
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Linq;
using System.Windows;
using System.Reflection;

using NServiceBus;
using Cinch;
using WpfCommon;
using MyMessages;

namespace WpfPublisherB
{
    /// <summary>
    /// Interaction logic for App.xaml
    /// </summary>
    public partial class App : Application, INonHandledMessages
    {

        public App()
        {
            IsOnline = false;
            VertexNonHandledMessagesReceived = 
                 new Queue<AddVertexRequestMessage>();
            EdgeNonHandledMessagesReceived = 
                 new Queue<AddEdgeRequestMessage>();
        }

        public static IBus Bus { get; private set; }


        protected override void OnStartup(StartupEventArgs e)
        {
            base.OnStartup(e);

            CinchBootStrapper.Initialise(
              new List<Assembly> { typeof(AddNewVertexPopupWindow).Assembly });

            Bus = NServiceBus.Configure.With()
                .DefaultBuilder()
                .XmlSerializer()
                .RijndaelEncryptionService()
                .MsmqTransport()
                    .IsTransactional(false)
                    .PurgeOnStartup(false)
                .MsmqSubscriptionStorage()
                .UnicastBus()
                    .ImpersonateSender(false)
                .LoadMessageHandlers() // need this to load MessageHandlers
                .CreateBus()
                .Start();
        }

        #region INonHandledMessages Members

        public Queue<AddVertexRequestMessage> 
                   VertexNonHandledMessagesReceived { get; set; }
        public Queue<AddEdgeRequestMessage> 
                   EdgeNonHandledMessagesReceived { get; set; }
        public bool IsOnline { get; set; }

        #endregion
    }
}

Window Code-Behind

OK, so we now have some unhandled messages queued up, waiting to be dealt with by the GraphLayoutViewModel. But we can't do anything with them until the GraphLayoutViewModel gets instantiated, and furthermore, once these messages are handled, we want to revert back to how the messages should be handled when the processes are deemed to be running, which is to simply relay incoming messages straight into the GraphLayoutViewModel via the use of the Cinch Mediator. So how does all that occur? Well, if we follow it through.

In the Window code-behind, there is some code like this:

C#
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;
using System.Windows.Interop;

using MyMessages;
using WpfCommon;

namespace WpfPublisherB
{
    /// <summary>
    /// Interaction logic for Window1.xaml
    /// </summary>
    public partial class Window1 : Window, IGraphFunctions
    {
        public Window1()
        {
            InitializeComponent();
            this.SourceInitialized += OnSourceInitialized;
        }

        private void OnSourceInitialized(object sender, EventArgs e)
        {
            //make sure ViewModel can use NServiceBus
            GraphLayoutViewModel vm = (GraphLayoutViewModel)this.DataContext;
            vm.Bus = App.Bus;

            //tell ViewModel about App so it can process
            //received offline messages via nice interface
            vm.NonHandledMessageSource = 
                  (INonHandledMessages)(App)App.Current;

            ......
            ......
            ......
        }

        #region IGraphFunctions Members

        public void LayoutGraph()
        {
            ((IGraphFunctions)graphControl).LayoutGraph();
        }

        public void AddNewVertex(PocVertex newVertex)
        {
            ((IGraphFunctions)graphControl).AddNewVertex(newVertex);
        }

        public void AddNewEdge(PocEdge newEdge)
        {
            ((IGraphFunctions)graphControl).AddNewEdge(newEdge);
        }

        #endregion
    }
}

The important part of this is that when the GraphLayoutViewModel is instantiated, it has its NonHandledMessageSource property set to the App (Application class) instance, using a INonHandledMessages interface. So let's carry on and see what that GraphLayoutViewModel.NonHandledMessageSource property looks like, shall we?

C#
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ComponentModel;
using GraphSharp.Controls;
using MEFedMVVM.ViewModelLocator;
using System.ComponentModel.Composition;
using Cinch;
using MyMessages;
using NServiceBus;
using System.Threading;

namespace WpfCommon
{
    public class PocGraphLayout : GraphLayout<PocVertex, PocEdge, PocGraph> { }

    [ExportViewModel("GraphLayoutViewModel")]
    [PartCreationPolicy(CreationPolicy.NonShared)]
    public class GraphLayoutViewModel : ViewModelBase
    {

        [ImportingConstructor]
        public GraphLayoutViewModel(
            IMessageBoxService messageBoxService,
            IViewAwareStatus viewAwareStatusService,
            IUIVisualizerService uiVisualizerService)
        {
            ......
            ......
            ......
            ......
            ......
        }

        private void AddNonHandledOffLineMessagesReceived(
                INonHandledMessages localNonHandledMessageSource)
        {
            try
            {
                //go through all missing Vertex messages and add them in
                foreach (AddVertexRequestMessage vertexNonHandled in
                    localNonHandledMessageSource.VertexNonHandledMessagesReceived)
                {


                    PocVertex newVertex = new PocVertex(
                vertexNonHandled.NewVertexName.Value, vertexNonHandled.IsMale);
                    PocVertex existingVertex = Graph.Vertices.Where(
                v => v.ID == vertexNonHandled.ConnectedToVertex.Value).Single();
                    Graph.AddVertex(newVertex);

                    PocEdge newEdge = new PocEdge(string.Format("{0}-{1}", 
                vertexNonHandled.ConnectedToVertex.Value,
                        newVertex.ID), existingVertex, newVertex);
                    Graph.AddEdge(newEdge);
                }


                //go through all missing Edge messages and add them in
                foreach (AddEdgeRequestMessage edgeNonHandled in
                    localNonHandledMessageSource.EdgeNonHandledMessagesReceived)
                {

                    PocVertex existingVertex1 = Graph.Vertices.Where(
                v => v.ID == edgeNonHandled.ConnectedFromVertex.Value).Single();
                    PocVertex existingVertex2 = Graph.Vertices.Where(
                v => v.ID == edgeNonHandled.ConnectedToVertex.Value).Single();

                    PocEdge newEdge = 
                        new PocEdge(string.Format("{0}-{1}", existingVertex1.ID,
                        existingVertex2.ID), existingVertex1, existingVertex2);
                    Graph.AddEdge(newEdge);
                }

                ((IGraphFunctions)viewAwareStatusService.View).LayoutGraph();

                //Tell message handlers to relay messages
                //directly to GraphLayoutViewModel using
                //Cinch Mediator from now on
                localNonHandledMessageSource.IsOnline = true;

            }
            catch
            {
                //user might have picked a Vertex that we dont have yet
            }
        }

        public INonHandledMessages NonHandledMessageSource
        {
            set
            {
                //get handle to INonHandledMessages (current App)
                AddNonHandledOffLineMessagesReceived(value);
            }
        }
    }
}

It can be seen that this GraphLayoutViewModel.NonHandledMessageSource property simply delegates the job to the AddNonHandledOffLineMessagesReceived() method, which dutifully goes about handling the unhandled messages, and the last thing it does is set the "OnLine" flag to true such that any new NServiceBus will not be handled like this. They will instead be routed directly from the NServiceBus message handler classes straight into the GraphLayoutViewModel by using the Cinch Mediator.

Message Handlers

The final piece of the puzzle is the actual NServiceBus mesage handler classes, which now that I have discussed things a bit, you should have no problem understanding. I will only outline one set of request/response messages as the other pair works the same way.

Request

The request is where all the real work happens. It is here where the message handler should do all its work. Now, as I have said all the way through this article, these message handler classes are automagically summoned into existence by the NServiceBus framework, and our application code has no control over that. So the only option for getting extra useful classes into these handlers is by using IOC, which I discussed earlier in this article. The other thing that was not so obvious to me for a while was how to get stuff from these NServiceBus mesage handler classes into the rest of my application (for example, into the GraphLayoutViewModel which both the demo processes make use of to bind against for their state). In the end, what I ended up doing was tackling an enterprise level disconnected messaging system a.k.a. NServiceBus with an internal disconnected messaging system a.k.a. the Cinch Mediator, and it seems to work very well.

Anyway, here is the full code for one of the NServiceBus mesage handler classes; see how it does different things based on the "Online" flag we discussed earlier:

C#
using System;
using System.Windows;

using MyMessages;
using Cinch;
using NServiceBus;
using WpfCommon;

namespace WpfPublisherB
{
    public class AddEdgeRequestMessageHandler : 
                   IHandleMessages<AddEdgeRequestMessage>
    {
        public void Handle(AddEdgeRequestMessage message)
        {

           INonHandledMessages NonHandledMessageSource = 
                        (INonHandledMessages)(App)App.Current;

           if (!NonHandledMessageSource.IsOnline)
           {
               ((INonHandledMessages)(App)
                   App.Current).EdgeNonHandledMessagesReceived.Enqueue(message);
           }
           else
           {
               AddEdgeResponseMessage response = null;
               try
               {
                   Mediator.Instance.NotifyColleagues<AddEdgeRequestMessage>(
                                 "AddEdgeRequestMessage", message);

                   response = App.Bus.CreateInstance<AddEdgeResponseMessage>(m =>
                   {
                       m.ResponseMessageStatus = 
                          "New Edge added to WpfPublisherB correctly";
                   });
               }
               catch
               {
                   response = App.Bus.CreateInstance<AddEdgeResponseMessage>(m =>
                   {
                       m.ResponseMessageStatus = 
                          "Failed to add new Edge to WpfPublisherB";
                   });
               }


               if (response != null)
                   App.Bus.Reply(response);
           }
        }
    }
}

Where the actual AddEdgeRequestMessage message class (see the MyMesages project) looks like this:

C#
[TimeToBeReceived("00:00:20")] // twenty seconds
public class AddEdgeRequestMessage : IMessage
{
    public WireEncryptedString ConnectedFromVertex { get; set; }
    public WireEncryptedString ConnectedToVertex { get; set; }
}

Note the use of the WireEncryptedString, which will be encrypted using the standard NServiceBus RijndaelEncryptionService which we turns on both in the App.Config and the self-hosting setup in App.xaml.cs.

Response

The response is pretty dumb, and simply shows a MessageBox to state that the request was successful; here is the NServiceBus message handler class in its entirety:

C#
using System;
using System.Windows;

using MyMessages;
using NServiceBus;

namespace WpfPublisherB
{
    class AddEdgeResponseMessageHandler : 
           IHandleMessages<AddEdgeResponseMessage>
    {
        public void Handle(AddEdgeResponseMessage message)
        {
            MessageBox.Show(message.ResponseMessageStatus);
        }
    }
}

Where the actual AddEdgeResponseMessage message class (see the MyMesages project) looks like this:

C#
[TimeToBeReceived("00:00:20")] // twenty seconds
public class AddEdgeResponseMessage : IMessage
{
    public WireEncryptedString ResponseMessageStatus { get; set; }
}

The Final Word

I just wanted to touch on something before I close this article, which seems to be the way in which Bus style frameworks (NServiceBus/ MassTransit/Rhino Service Bus) are developed compared to RPC technologies such as Remoting and WCF. Now, they may really suit some people, but I found that they did not really suit my coding style all that well. What I found to be a particular pain with pretty much all these three Bus frameworks that I looked at (and after much analysing, I did feel NServiceBus was the best one of all) was that the point where you handled incoming messages, you could not really do much at that point, due to the fact that the framework either automagically created the message handling classes (as I stated was the case with NServiceBus), or you simply could not get to the other parts of your application that easily to react to the incoming message.

OK, as I said in the rest of the article, I got around this by using my own internal messaging system (Mediator from my Cinch MVVM framework) to relay messages from the Bus framework (NServiceBus in my case) to the other parts of my app that wanted to know about incoming messages.

But should it really be that hard, I don't know. I certainly did not see anyone doing anything more than the basics when searching for examples on these three bus frameworks. Sure, people were using NHibernate in the message handlers to persist stuff, but that did not really get me that jazzed to be honest. I know how to use an ORM to store stuff to a database. I wanted to see stuff interacting with the rest of my app.

It seems to me, the authors of these bus frameworks intend them to work in a fashion where a message is handled and you persist something to a database within a message handler, nice and self contained I agree, but how useful is that. To me, not much, what I wanted was a messaging system that utilised MSMQ that was easy to use and that I could use to send messages around different processes/applications and technologies. NServiceBus did promise this. I admit, with some jiggery pockery, I got what I wanted, but not without having to resort to using an internal messager (Mediator from my Cinch MVVM framework) to relay the messages around the internals of the app once it came from MSMQ via NServiceBus into a message handler.

Perhaps, I just have it all wrong, and some bright spark reading this article will tell me, yeah, you should not really use a service bus approach like that at all, where I would go why not?

In fairness though, the author of NServiceBus does claim that NServiceBus should be used in conjunction with different technolgies such as WCF, so I think what I ended up with is pretty workable. Another thing in NServiceBus's favour is that I am new to it, and I am not a seasoned NServiceBus hacker, and I did not want to spend absolutely months working on this single article, so I could have missed something.

Comments About Original Article's Content From NServiceBus Author

I was extremely pleased to see that my little old article got the attention of the author of NServiceBus, and this is what he had to say:

  1. In full duplex communication between client and server, the server doesn't need to have the client queue configured. The scenario described in the article is actually a bit unusual - on the one hand, the processes are configured as publishers, but they don't actually publish. It is more common to see each of these clients sending messages to a server that will publish events about the changes back to all subscribed clients, who will (in turn) update their UI. Also, the security stuff is optional, and probably not needed for the scenario shown in the article.
  2. The TimeToBeReceived attributed is only relevant for message types - not handlers. It indicates how long messages can spend across all queues before they will be discarded, if they couldn't be delivered to their target queue by that time.

With respect to integrating NServiceBus into a multi-threaded client app - there is more complexity that needs to be dealt with when working in deeper business domains. I've described these challenges in an article I wrote for MSDN magazine a while ago: Optimizing a Large Scale Software + Services Application.

I have of course taken note of these comments from Udi and amended the articles code and text.

Important Amendment When Working with MVVM and WPF/SL

When I first published this article, I could not find much information out there on working with NServiceBus with WPF/SL using current best practices, and I mentioned in the article's text that I found working with the message handlers a bit of a nightmare, and did not like the fact that the NServiceBus framework automagically created these classes and I had no control over what went into these classes as I was not in control of the instantiation of the message handler classes.

Well, luckily, this article caught the eye of the NServiceBus author, who I have been having lots of email banter/chats with, and he talked me through how he would typically deal with working with WPF. In his words, this is what he stated in an email to me:

"I tend to have logic in controller objects which indeed are singletons, and thus are easily injected into handlers. Those controllers would have references to the current state of open View Models and manage them."

- Udi Dahan, email to Sacha Barber dated 02/09/2010

So that is all well and good, but how does that translate into code? Well, I will show you in just a second, but I just need to mention one thing. I have adjusted the attached code and outlined all code changes below to support Udi's advice, but I still use the Cinch Mediator as the primary way of getting stuff out of the NServiceBus message handler classes, and have added on the singleton controller idea, but when the message handler instructs the singleton controller that a new message has arrived, all that happens is that the singleton controller will call empty method stubs on the registered ViewModels. I feel this should still be enough to illustrate the point of what the singleton controller should do, should you use that approach.

Anyway, this section will show the list of amendments that I have made to the codebase since I published the article to support the singleton controller idea that Udi suggested in his email to me.

Starting With a Controller Class

So we know that we want to have a controller singleton, now it is up to you how you design that. For me, I thought it would be best to have one controller singleton per ViewModel type, so I came up with a GraphLayoutControllerSingleton which is the controller for GraphLayoutViewModel types. I am, of course, using an interface for the controller so it can be mocked. Anyway, here is the code:

C#
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using MyMessages;

namespace WpfCommon
{
    public interface IGraphLayoutController
    {
        void AddViewModel(GraphLayoutViewModel viewModelToNotifyWhenMessagesArrive);
        void NotifyViewModelsOfNewVertexMessage(AddVertexRequestMessage message);
        void NotifyViewModelsOfNewEdgeMessage(AddEdgeRequestMessage message);
    }

    /// <summary>
    /// After some discussions with NServiceBus author
    /// he suggested that the way he gets things from
    /// his MessageHandler classes when using WPF
    /// is using a controller that knows about other objects
    /// where the controller is a singleton, that broadcasts
    /// the message to the internal objects
    /// that wish to be notified. There could be one
    /// controller per  message type of per ViewModel type
    /// that is interested in the messages. The controller
    /// singleton is registered in the App.Xaml.cs
    /// </summary>
    public sealed  class GraphLayoutControllerSingleton : IGraphLayoutController
    {
        #region Data
        private List<GraphLayoutViewModel> 
                viewModelsToNotify = new List<GraphLayoutViewModel>();
        private static readonly GraphLayoutControllerSingleton 
                instance = new GraphLayoutControllerSingleton();
        #endregion

        #region Ctor
        // Explicit static constructor to tell C# compiler
        // not to mark type as beforefieldinit
        static GraphLayoutControllerSingleton()
        {
        }

        private GraphLayoutControllerSingleton()
        {
        }
        #endregion

        #region Public Properties
        public static GraphLayoutControllerSingleton Instance
        {
            get
            {
                return instance;
            }
        }
        #endregion

        #region IGraphLayoutController Members

        public void AddViewModel(GraphLayoutViewModel 
                    viewModelToNotifyWhenMessagesArrive)
        {
            viewModelsToNotify.Add(viewModelToNotifyWhenMessagesArrive);
        }

        public void NotifyViewModelsOfNewVertexMessage(AddVertexRequestMessage message)
        {
            foreach (GraphLayoutViewModel vm in viewModelsToNotify)
            {
                vm.AddVertexFromMessage(message);
            }
        }

        public void NotifyViewModelsOfNewEdgeMessage(AddEdgeRequestMessage message)
        {
            foreach (GraphLayoutViewModel vm in viewModelsToNotify)
            {
                vm.AddEdgeFromMessage(message);
            }
        }
        #endregion
    }
}

Registering Controller in the IOC Container

Now that we have a singleton controller, we need to register it with the IOC container that NServiceBus will use; this is easily done just after we do the Bus self-hosting code in App.xaml.cs for me, since it is a WPF project. The new section of code is shown below for App.xaml.cs:

C#
protected override void OnStartup(StartupEventArgs e)
{
    base.OnStartup(e);

    CinchBootStrapper.Initialise(
       new List<Assembly> { typeof(AddNewVertexPopupWindow).Assembly });

    Bus = NServiceBus.Configure.With()
        .DefaultBuilder()
            .RunCustomAction(() =>
              Configure.Instance.Configurer.RegisterSingleton(
                                 typeof(IGraphLayoutController),
        GraphLayoutControllerSingleton.Instance))
        .XmlSerializer()
        .RijndaelEncryptionService()
        .MsmqTransport()
            .IsTransactional(false)
            .PurgeOnStartup(false)
        .MsmqSubscriptionStorage()
        .UnicastBus()
            .ImpersonateSender(false)
        .LoadMessageHandlers() // need this to load MessageHandlers
        .CreateBus()
        .Start();
}

This is the only method in App.xaml.cs I had to change; all the rest of the code in App.xaml.cs remains the same as the original article's content. It should be clear from the code snippet that all we are doing is registering the GraphLayoutControllerInstance with the current IOC container that NServiceBus will use.

Registering ViewModels With Controller/Providing Message Methods

The next step is to actually add ViewModel instances to the GraphLayoutControllerInstance, such that they are available to call when we receive a new message in the NServiceBus message handlers. This is simply done in the constructor of the ViewModel (as I say, I went with one singleton controller per ViewModel type, but this is really driven by your application's needs; this suited the demo app's requirements).

C#
public GraphLayoutViewModel(IMessageBoxService messageBoxService,
    IViewAwareStatus viewAwareStatusService,
    IUIVisualizerService uiVisualizerService)
{
    ......
    ......
    ......
    ......

    //this is not used in the demo app as I am using the Mediator to get messages from
    //the NServiceBus message handlers into my ViewModels, but this shows an alternative
    //which I discuss near the bottom of the article. This is for demo purposes only
    GraphLayoutControllerSingleton.Instance.AddViewModel(this);
}

The next thing we need to do is simply provide the message handler methods that the GraphLayoutControllerInstance is excpecting to call for each registered ViewModel. For me, this simply looks like the following for the demo app's GraphLayoutViewModel:

C#
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ComponentModel;
using GraphSharp.Controls;
using MEFedMVVM.ViewModelLocator;
using System.ComponentModel.Composition;
using Cinch;
using MyMessages;
using NServiceBus;
using System.Threading;

namespace WpfCommon
{
    public class PocGraphLayout : GraphLayout<PocVertex, 
                                  PocEdge, PocGraph> { }

    [ExportViewModel("GraphLayoutViewModel")]
    [PartCreationPolicy(CreationPolicy.NonShared)]
    public class GraphLayoutViewModel : ViewModelBase
    {
        ......
        ......
        ......
        ......

        #region Public Methods To Show How Singleton Controller Works
        //this is not used in the demo app as I 
        //am using the Mediator to get messages from
        //the NServiceBus message handlers into 
        //my ViewModels, but this shows an alternative
        //which I discuss near the bottom of the article.
        //This is for demo purposes only
        public void AddVertexFromMessage(AddVertexRequestMessage message)
        {
            //do stuff here
        }

        //this is not used in the demo app as I 
        //am using the Mediator to get messages from
        //the NServiceBus message handlers into 
        //my ViewModels, but this shows an alternative
        //which I discuss near the bottom of the article.
        //This is for demo purposes only
        public void AddEdgeFromMessage(AddEdgeRequestMessage message)
        {
            //do stuff here
        }
        #endregion
    }
}

Using Controller In Message Handlers

The very last step is to inject the GraphLayoutControllerInstance into the NServiceBus message handlers and use it. Shown below is an amended AddVertexRequestMessageHandler from the demo app. You should be able to see a new GraphLayoutControllerInstance property which will get set by the current IOC container in use with NServiceBus. You should also be able to see how this GraphLayoutControllerInstance gets used in the code where we simply pass the incoming message to the controller, and it will pass that to all the registered ViewModels that it has registered.

C#
using System;
using System.Windows;

using MyMessages;
using Cinch;
using NServiceBus;
using WpfCommon;


namespace WpfPublisherA
{
    public class AddVertexRequestMessageHandler : 
                    IHandleMessages<AddVertexRequestMessage>
    {
        //this is not used in the demo app
        //as I am using the Mediator to get messages from
        //the NServiceBus message handlers into
        //my ViewModels, but this shows an alternative
        //which I discuss near the bottom
        //of the article. This is for demo purposes only
        public IGraphLayoutController GraphLayoutController { get; set; }


        public void Handle(AddVertexRequestMessage message)
        {
            INonHandledMessages NonHandledMessageSource = 
                               (INonHandledMessages)(App)App.Current;

            if (!NonHandledMessageSource.IsOnline)
            {
                ((INonHandledMessages)
                  (App)App.Current).VertexNonHandledMessagesReceived.Enqueue(message);
            }
            else
            {
                AddVertexResponseMessage response = null;
                try
                {
                    Mediator.Instance.NotifyColleagues<AddVertexRequestMessage>(
                               "AddVertexRequestMessage", message);

                    //this is not used in the demo app as
                    //I am using the Mediator to get messages from
                    //the NServiceBus message handlers
                    //into my ViewModels, but this shows an alternative
                    //which I discuss near the bottom
                    //of the article. This is for demo purposes only
                    GraphLayoutController.NotifyViewModelsOfNewVertexMessage(message);

                    response = App.Bus.CreateInstance<AddVertexResponseMessage>(m =>
                    {
                        m.ResponseMessageStatus = 
                           "New Node added to WpfPublisherA correctly";
                    });
                }
                catch
                {
                    response = App.Bus.CreateInstance<AddVertexResponseMessage>(m =>
                    {
                        m.ResponseMessageStatus = 
                           "Failed to add new Node to WpfPublisherA";
                    });
                }

                if (response != null)
                    App.Bus.Reply(response);
            }
        }
    }
}

Unregistering Instances from the Controller

I have not allowed for ViewModel instances to be unregistered from the controller, but that would be easy enough to do, and is something you should consider when you create a real production system.

That's it

That is pretty much all I wanted to say on NServiceBus actually is, I hope you have enjoyed it, and maybe can see somewhere where it may be useful to you in your own projects. Could I just ask, if you enjoyed the article, could you please leave a vote/comment as a way of appreciation? Thanks. See you next time.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)