Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / web / HTML

Queuing Windows Services work items using MSMQ, and live progress using WCF Net Named Pipe

4.89/5 (8 votes)
4 Mar 2015CPOL12 min read 56K   732  
How to use MSMQ to queue work items for a windows service to execute, and then use WCF Net Named Pipe to report progress to a WPF application

 

Introduction

This article will showcase how to queue up work items using MSMQ, process them with a windows service, and then report progress to a WPF application using WCF NetNamedPipe

Background

A while back, I had to parse through 3+ GB worth of relational data on a somewhat regular (daily) basis. Originally I constructed my parser such that everything happened in one application. When I needed to parse, I just fired up the app, let it chew through the data for a few hours, and then shut it down.

It worked, but the architecture dedicated my machine for hours on end, and if the parser failed, I had to start over from the beginning. It was a lot of extra work to restart the failed portion and pick up where it left off in the list of work items. Additionally, I didn't always care about visualing progress; and when I did, I didn't want to have some grandiose command flag logic in the rest of the application for only toggling the UI.

What I wanted was to queue up specific operations on a dedicated box, and have it automatically start processing. I wanted to be able to queue up operations individually at any time, or out of order. I also wanted to be able to fire up a dedicated app on the box if I cared to check progress.

In order to make this happen, I needed MSMQ for the operations, a windows service to consume the queue, and WCF net named pipe to inter-process communicate the progress to another application, if it was there to listen.

Outline

This article has two sections:

  1. The Setup
  2. The Code

The Setup

First, we'll need to turn on MSMQ, if it's not already turned on. go to Control Panel, and then "Turn Windows Features on or off"

Image 1

Then, select MSMQ to turn it on.

Image 2

Next, we'll launch compmgmt.msc, and scroll down to Private Queue

Image 3

Right click, and add a new queue.

Image 4

If you drill into the properties of the queue, you can set up so that you get a journal entry when a queue item is consumed, as well as some other enterprise-y type features.

A hard-earned piece of advice if you decide to turn on journaling: you have to manually empty the journal from time to time - Limit journal storage to (KB) doesn't mean "flush old entries after this size", it means "stop writing to the journal when the journal gets to this size". 

Image 5

While we're here, let's write down the name of this queue. Mine is win81-tqgmg4i5\private$\HardWorkingQueue. We'll need this later.

Note: You should take the time now to click on the security tab:

Image 6

You'll notice that by default, "Everyone" is allowed to use our queue. For purposes of this article, this is fine for now; but before you go deploying this in a production environment, you'll want to remove "Everyone", and grant full control to the "System" account from your local machine. We don't want everybody who has a logon to this box using our queue.

More importantly though, we need to add the account that our windows service is going to be running under (which is System), and if we don't do it now, we'll have problems down the line.

Image 7

The Code

Now that we've got that out of the way, let's do some code!

The code has the following sections:

  1. Enqueing a message
  2. WCF Named Pipe
  3. The Windows Service
  4. Displaying progress using WPF
  5. Bonus Points - Enqueing work via Web API

Enqueing a message

Let's start off by putting a work item on the queue. Let's keep it simple for the moment, and add a console application.

C#
class Program
{
    static void Main(string[] args)
    {
        GatherAndSend();            
    }

    private static void GatherAndSend()
    {
        Console.WriteLine("type some stuff to queue up");
        var input = Console.ReadLine();

        using (var mq = new MessageQueue(ConfigurationManager.AppSettings["OperationMessageQueuePath"]))
        {
            var qItem = new QueuedWorkItem() { Name = input };
            using (var msg = new System.Messaging.Message(qItem))
            {
                msg.Label = "Queued Item from the console";
                msg.Formatter = new XmlMessageFormatter(new Type[] { typeof(QueuedWorkItem) });
                mq.Send(msg);
                Console.WriteLine("Message sent. Message ID is {0}", msg.Id);
            }
        }

        Console.WriteLine("write another message to the queue? Y/N");
        var decision = Console.ReadLine();
            
        if(decision == "y")
        { GatherAndSend(); }

    }
}

Really easy, right? Instantiate the message queue using the name we got from above, create a message, declare the formatter, and send it.  Let's run our console, give it some input, and then check out the message queue!

Image 8

Image 9

THIS is why we use XML over binary - see how clear and easy this is to tell what we're queuing up?

Ok, let's move on.

The WCF Named Pipe

First, let's start by defining the WCF service which will act as the intermediary between the two processes:

All WCF services start with a service contract

C#
[ServiceContract]
internal interface IPipeService
{
    [OperationContract]
    void RecieveMessage(String message);
}

followed by an implementation

C#
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
internal class PipeService : IPipeService
{
    public static string URI = "net.pipe://localhost/Pipe";

    public Action<String> MessageReceived = null;

    public void ReceiveMessage(String message)
    {
        if (MessageReceived != null)
        { MessageReceived(items); }
    }
}

Notice here that our imlementation doesn't actually handle the business logic of what happens when the WCF service recieves the data - it relies on a delegate to handle that. We'll come back to this in a bit.

I went with the internal keyword here. Normally, we would consider ourselves done by declaring the contract and an implementation - however, there's some small amount of setup and teardown work that both the sender and reciever have to do in order to be able to utilize WCF services - and for purposes of application logic, I'd like to not do that every time I want to send or recieve a message - so we're going to provide a simpler way to interact with the pipe and expose that to our developer instead of the raw WCF infrastructure.

On the sender side, we're going to build the following

public class Sender
{
    private static Object _Lock = new Object();
    private static EndpointAddress _endpointAddress = new EndpointAddress(String.Format("{0}/{1}", PipeService.URI, Receiver.DefaultPipeName));        

    /// <summary>
    /// Attempts to send the message to the proxy at the pre-configured endpoint
    /// </summary>
    /// <param name="message">The message to send</param>
    /// <returns>True, upon success</returns>
    public static Boolean SendMessage(String message)
    {
        var success = false;
        try
        {
            lock (_Lock) //ensure thread exclusivity when sending messages across the wire
            {
                var proxy = ChannelFactory<IPipeService>.CreateChannel(new NetNamedPipeBinding(), _endpointAddress);                    
                proxy.RecieveMessage(message);                     
            }

            success = true;
        }
        catch (Exception ex) //Most likely, there was nobody to send a message to.
        { } //TODO : Add some logging

        return success;
    }       
}

This allows for some really easy interaction - in order to send a message over the pipe to the consuming side, all we have to do is invoke Sender.SendMessage. - much easier. (And, we're thread-safe!)

Take notice here that we're also recreating the proxy every time we attempt to send the message - that's because as the windows service runs, (and sends messages) the host of the service may intermittently come on and offline as we periodically drop by to check progress. We want it to work if someone's watching, and silently fail otherwise. If we make it a singleton, we only have one chance to check if the host is present; with this, we get an opportunity to check every time before sending.

Now, the recieving side is a bit different

First, we'll start by declaring our member level variables:

C#
public const String DefaultPipeName = "Pipe1";

private PipeService _ps = new PipeService();
private ServiceHost _host = null;        
private Boolean _operational { get; set; }

#region PipeName

private String _PipeName = String.Empty;

/// <summary>
/// Gets the name of the pipe being used by this reciever
/// </summary>
public String PipeName
{
    get { return _PipeName; }            
}

#endregion

and then we'll declare our constructors

C#
public Receiver(Action<String> messageReceivedAction) : this(DefaultPipeName, messageReceivedAction) { }

public Receiver(String pipeName, Action<String> messageReceivedAction)
{
    _PipeName = pipeName; 
    _ps.MessageReceived = messageReceivedAction;
} 

In the constructor, we're passing in a delegate that's going to be executed when data is received from the sender. We default it to use a predefined name, but also allow the caller to override that with something else, if necessary.

Here's where our magic starts - where we actually start the service:

C#
/// <summary>
/// Performs the act of starting the WCF host service
/// </summary>
/// <returns>true, upon success</returns>
public Boolean ServiceOn()
{

    try
    {
        _host = new ServiceHost(_ps, new Uri(PipeService.URI));
        _host.AddServiceEndpoint(typeof(IPipeService), new NetNamedPipeBinding(), _PipeName);
        _host.Open();
        _operational = true;
    }
    catch (Exception ex)
    {
        _operational = false;
    }

    return _operational;
}

We're starting up a new WCF service host based on our constructed instance of the IPipeService, and then using NetNamedPipeBinding as our binding protocol to bind the operations of IPipeService's service contract to the pipe located at _PipeName. If everything started successfully, we then store true, and return it.

To stop the WCF service:

C#
/// <summary>
/// Stops the hosting service
/// </summary>
public void ServiceOff()
{
    if (_host == null)
    { return; } //already turned off
    
    if (_host.State != CommunicationState.Closed)
    { _host.Close(); }
    
    _operational = false;
}

 

The Windows Service

Let's begin this step by adding a Windows Service to the solution. You can name it whatever you want. Windows Services are basically programs that always run in the background and do not provide a UI. Wikipedia has a good article which provides a bit more detail on what it is, if you're interested.

Image 10

I have to really chastise Microsoft here for a moment, because they leave a lot of necessary things out of the default project that Windows Services need in order to have happy users post install. 

  1. They leave out the install configuration scaffolding
  2. They leave out a proxy to System.Configuration.ConfigurationManager

I digress for a minute to provide some further explanation to point 2. When your windows service is installing, it's not running in its own application context - It's running under the context of the Service Control Manager. (or more appropriately, InstallUtil.exe) Since we're not technically running in our own execution context during installation time, our ability to utilize the app.config, and by proxy, the ConfigurationManager, is removed.

Let's start by declaring our installation scaffolding. Add a reference to System.Configuration.Install, and add the following class:

C#
[RunInstaller(true)]
public partial class HardWorkingServiceInstaller : System.Configuration.Install.Installer
{
}

What we're doing here is making a custom installer. We'll use this to set attributes of our windows service, like the name, description, account, and start type.

Next, we need a way to access the app.config values during install time. Add the following class:

C#
internal static class InstallTimeConfigurationManager
{
     public static string GetConfigurationValue(string key)
     {
         Assembly service = Assembly.GetAssembly(typeof(HardWorkingServiceInstaller));
         var config = ConfigurationManager.OpenExeConfiguration(service.Location);

         return config.AppSettings.Settings[key].Value;
     }
}

This will load the app.config, and provide programmatic access to the app.config values at install time.

Now, we can come back to our custom installer, and flesh out the class.

C#
[RunInstaller(true)]
public partial class HardWorkingServiceInstaller : System.Configuration.Install.Installer
{
    public HardWorkingServiceInstaller() : base()
    {            
        var serviceProcessInstaller = new ServiceProcessInstaller();
        var serviceInstaller = new ServiceInstaller();

        //Service Account Information
        serviceProcessInstaller.Account = ServiceAccount.LocalSystem;
        serviceProcessInstaller.Username = null;
        serviceProcessInstaller.Password = null;

        //Service Information
        serviceInstaller.DisplayName = InstallTimeConfigurationManager.GetConfigurationValue("ServiceDisplayName");
        serviceInstaller.Description = InstallTimeConfigurationManager.GetConfigurationValue("ServiceDescription");
        serviceInstaller.StartType = ServiceStartMode.Automatic;
        serviceInstaller.DelayedAutoStart = true;

         //This must be identical to the WindowsService.ServiceBase name
         //set in the constructor of WindowsService.cs
        serviceInstaller.ServiceName = InstallTimeConfigurationManager.GetConfigurationValue("SystemServiceName");

        this.Installers.Add(serviceProcessInstaller);
        this.Installers.Add(serviceInstaller);

        this.Committed += Installer_Committed;
    }

    private void Installer_Committed(Object sender, InstallEventArgs e)
    {
        //auto start the service once the installation is finished
        var controller = new ServiceController(InstallTimeConfigurationManager.GetConfigurationValue("SystemServiceName"));
        controller.Start();
    }
}

We're basically telling the installer to run under the System account for the local machine, set up the display name, and description for discovery in the Windows Service Manager, and set it to delayed auto start. Additionally, once we've installed it, we'll set it to start right now (so we don't forget to do that....like I might have...more than once....)

Now, add a reference to System.Messaging, and come over to the HardWorkingService class which inherits from ServiceBase

C#
#region OperationMessageQueue

private MessageQueue _OperationMessageQueue;

protected MessageQueue OperationMessageQueue
{
    get
    {
        if (_OperationMessageQueue == null)
        {
            _OperationMessageQueue = new MessageQueue(ConfigurationManager.AppSettings["OperationMessageQueuePath"]);
            _OperationMessageQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(QueuedWorkItem) });
            _OperationMessageQueue.ReceiveCompleted += Mq_ReceiveCompleted;
        }
        return _OperationMessageQueue;
    }
}

private void Mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
    throw new NotImplementedException();
}

#endregion

What we're doing is setting up a delegate to execute when an item is piped through the queue. Admittedly, XML is not the most efficient of our MQ options here - we could go binary - however, if we wish to later analyze the queue's journal, XML allows for a human readable log of what went through the queue.

Let's take a second to fill out the Start/Stop operations:

C#
protected override void OnStart(string[] args)
{
    OperationMessageQueue.BeginReceive();
}

protected override void OnStop()
{
    OperationMessageQueue.ReceiveCompleted -= Mq_ReceiveCompleted;
    OperationMessageQueue.Dispose();
}

protected override void OnPause()
{
    OperationMessageQueue.ReceiveCompleted -= Mq_ReceiveCompleted;
}

protected override void OnContinue()
{
    OperationMessageQueue.ReceiveCompleted += Mq_ReceiveCompleted;
    OperationMessageQueue.BeginReceive();
}

Nothing special really, the important thing to remember is that when the windows service manager pauses your application, you actually continue to run - objects stay in memory, and processes, threads, and whatnot continue to execute. Basically, you've recieved a friendly request to stop what you're doing and go into a low-power state. For us, that means uncoupling our event handler for when messages are recieved from the queue, and wiring it back up when it's time to continue.

Let's come back now to that message recieved event handler:

C#
private void Mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
    using (var msg = OperationMessageQueue.EndReceive(e.AsyncResult))
    {
        try
        {
            var qItem = msg.Body as QueuedWorkItem;
            ProcessQueuedWorkItem(qItem);
        }
        catch (Exception ex)
        {
            //TODO : Write to the log we failed in some way
            Environment.Exit(Environment.ExitCode);
        }
    }

    OperationMessageQueue.BeginReceive();
}

MSMQ's API operates under .Net's event-based async programming model. That is, a Begin() method invokes the async operation, a delegate is called to handle the completion, and we extract the result from an End() method. This is fine - the only gotcha here is that we need to kick the process off again so we collect the next message from the queue; otherwise, our windows service will process one and only one message. 

This is not the only way to do it. We could block in a while-true loop for the <a href="http://msdn.microsoft.com/en-us/library/y918yfy2(v=vs.110).aspx" target="_blank">.Recieve</a> method - I mean, it's not like we're working on the UI thread, after all; However, I prefer the async implementation as it allows our application to yield resources back to the system while waiting for messages.

Now for the magic:

C#
private void ProcessQueuedWorkItem(QueuedWorkItem item)
{
    NamedPipe.Sender.SendMessage(String.Format("Starting work on {0}", item.Name));

    var delay = new TimeSpan(0,0,3);
    for(var i = 0; i < 5; i++)
    {
        NamedPipe.Sender.SendMessage(String.Format("beginning work on item {0} of 5 for {1}", i, item.Name));
        System.Threading.Thread.Sleep(delay); //yep, we're "working" really hard here :)
        NamedPipe.Sender.SendMessage(String.Format("completed work on item {0} of 5 for {1}", i, item.Name));
    }

    NamedPipe.Sender.SendMessage(String.Format("Completed work on {0}", item.Name));
}

Since we routed all of our WCF standup and error checking logic into the Sender class, it's really quite easy to pipe a message to another process. 

I think the real gotcha here is that it requires an inversion of thinking - a first thought would be that the windows service would be the WCF host, and the windows app would be consumer. In actuality, it's the other way around - Our windows service is going to broadcast a message to a fixed address, if there's nobody hosting the pipe at the address, then we'll just route the message to the bit bucket.

At this point, we're ready to start up the windows service, and let it start crunching on the local queue. Use InstallUtil.exe to install the service (make sure you're running as administrator), and attach a Visual Studio debugger to the running process if you want to step through your code.

When you're done, you should have two things:

A confirmation from InstallUtil:

Image 11

and your service now listed in the list of windows services:

Image 12

We configured our service to automatically start, and look! It's running! Also, look around, the name, description, and other variables are present! And if you check the MSMQ, it should be empty now as well, which means that we're actively consuming instructions off the queue.

Displaying the messages using WPF

At this point, we've achieved two of our three goals - being able to process instructions automatically, and being able to queue up instructions in any order we choose. Now, let's get some real-time progress.

Let's add a WPF project, and add MVVM light from NuGet. I think their NuGet powershell script needs to be updated, because evevry time I add it, the namespace doesn't get set up right in the app.xaml. Let's fix that. Head over to app.xaml, and adjust it like so:

XML
<Application 
    x:Class="ProgressObserver.App" 
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation" 
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" 
    StartupUri="MainWindow.xaml" 
    xmlns:d="http://schemas.microsoft.com/expression/blend/2008" 
    d1p1:Ignorable="d" 
    xmlns:d1p1="http://schemas.openxmlformats.org/markup-compatibility/2006"
    xmlns:vm="clr-namespace:ProgressObserver.ViewModel">
  <Application.Resources>
    <vm:ViewModelLocator x:Key="Locator" d:IsDataSource="True" />
  </Application.Resources>
</Application>

By default, MVVM light declares the xml namespace at the individual resource level. I just bump it to the document level because for purposes of this project, we're going to use everything in this namespace, and we're not going to have conflicting namespaces.

Let's head over to the view for the window:

XML
<Window x:Class="ProgressObserver.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        xmlns:converter="clr-namespace:ProgressObserver.Converters"
        Title="MainWindow" Height="350" Width="525"
        DataContext="{Binding Source={StaticResource Locator}, Path=Main}">
    <Window.Resources>
        <ResourceDictionary>
        <converter:LocalTimeConverter x:Key="localTime" />
        </ResourceDictionary>
    </Window.Resources>
    <Grid>
        <ItemsControl ItemsSource="{Binding Messages}">
            <ItemsControl.ItemTemplate>
                <DataTemplate>
                    <StackPanel Orientation="Horizontal">
                        <TextBlock Text="{Binding Recieved, Converter={StaticResource localTime}}" />
                        <TextBlock Text=":" />
                        <TextBlock Text="{Binding Message}" />
                    </StackPanel>
                </DataTemplate>
            </ItemsControl.ItemTemplate>            
        </ItemsControl>
    </Grid>
</Window>

Nothing much special here, other than using an IValueConverter to convert from UTC to local for the user. I will point out one thing - when I was a noob to MVVM light, one thing I didn't really appreciate right away was declaring the data context for the window in the XAML instead of code-behind. Since we've declared our locator as a static resource in the app.xaml file, we can hang all of the application's view models off of it as properties. Here, our main window's view model, is just one of the properties of our locator.

Almost done, let's head over to the MainViewModel.

We need to set up the WCF service host - this allows the windows service to inter-process communicate to our app:

C#
#region Receiver Property

/// <summary>
/// Member-level variable backing <see cref="Receiver" />
/// </summary>
private Receiver _Receiver = null;

/// <summary>
/// Gets and sets the <see cref="Receiver" /> property.         
/// </summary>
public Receiver Receiver
{
    get { 
        if(_Receiver == null)
        { _Receiver = new Receiver(this.OnMessageReceived); }

         return _Receiver; 
    }
    set
    {                
        Set(() => Receiver, ref _Receiver, value);
    }
}

#endregion

Notice how we passed a delegate to the constructor which tells the pipe what method to execute when progress is reported from our windows service.

In our constructor, we'll start the WCF host

C#
/// <summary>
 /// Initializes a new instance of the MainViewModel class.
/// </summary>
public MainViewModel()
{
    Messages = new ObservableCollection<MessageItem>();
    Receiver.ServiceOn();
}

And then finally, flesh out our message handling delegate:

C#
private void OnMessageReceived(String message)
{            
    Dispatcher.CurrentDispatcher.Invoke(() => {
        Messages.Add(new MessageItem(message));
    });
}

Notice how we're wrapping the add in a dispatched call. When we add the message, we're not executing on the UI thread. If we were working with a scalar type (int, string, etc), WPF would automatically marshall the PropertyChanged notification for us all the way to the UI thread. Since we're modifying a collection type, WPF doesn't do the routing for us, so we have to invoke this code on the UI thread - which we can do via the dispatcher.

Now, let's start everything up. If you set Visual Studio to run both our console and our visualizer, while the service is running, you should be able to do the following:

Image 13

Bonus Points!

If your MSMQ/work machine is also a web server, you can kick off queued operations via a web page via WebAPI! Download the code to see how this is done.

Image 14

History

2014-12-13 - Initial post

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)