Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

MSMQ Outgoing Queue Mover

5.00/5 (3 votes)
28 Jan 2012CPOL4 min read 30.8K   437  
How to move a wcf msmq message from an outgoing queue to another outgoing queue.

Introduction

While working on a WCF SOA solution for one of our clients, we were required to service the client through an MSMQ endpoint. We encountered a problem of MSMQ messages getting stuck on the outgoing queue because the wrong IP was used. We didn't want to lose any data from our clients so we decided to write a management utility that moves messages from the wrong outgoing queue to the right outgoing queue.

Background

There are a few things you should be familiar with (mostly as reference) for understanding the inner workings of the process:

Overview

When a message is sent from a client to a WCF service, the message gets serialized. The serialization format depends on the binding being used and in our case, we use NetMsmqBinding. If you have a look at the WCF protocol stack, you will notice that the specification for this binding is MS-NMFMB (.NET Message Framing MSMQ Binding Protocol Specification) which is based on MS-NMF (.NET Message Framing Protocol Specification). If we'll look inside a raw MSMQ message that was sent, we'll find the following structure:

  • The .NET Message Framing header (red)
  • WCF channels message (green)
  • Size of the following string (blue)

message bytes

The .NET Message Framing header

  • 0x00 – Version record, followed by 2 bytes.
    • 0x01 – Major version.
    • 0x00 – Minor version.
  • 0x01 – Mode record, followed by 1 byte.
    • 0x04 – Mode (single one-way message from a single source).
  • 0x02 – Via record, followed by via length. This is the important part.
    • 0x27 – ViaLength 0x27 is 39 which is the size of the following string "net.msmq://192.168.100.154/private/test".
  • 0x03 – Known encoding record, followed by one byte.
    • 0x07 – Binary, as specified in MC-NBFS.

The following data will be the serialized WCF Channels Message data. WCF uses a binary XML dictionary to serialize the message; you should have a look at Carlos Figueria's post about it. This is the original WCF Channels Message XML:

XML
<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope">
  <s:Header>
    <a:Action s:mustUnderstand="1" 
      xmlns:a="http://www.w3.org/2005/08/addressing">http://tempuri.org/ITestMSMQ/bla</a:Action>
    <a:To s:mustUnderstand="1" 
      xmlns:a="http://www.w3.org/2005/08/addressing">net.msmq://192.168.100.154/private/test</a:To>
  </s:Header>
  <s:Body>
    <bla xmlns="http://tempuri.org/">
      <input xmlns:b="http://schemas.datacontract.org/2004/07/WcfService1" 
                   xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
        <b:a>0</b:a>
      </input>
    </bla>
  </s:Body>
</s:Envelope>

This time we'll use MC-NBFS (.NET Binary Format: SOAP Data Structure) for XML encoding and MC-NBFX (.NET Binary Format: XML Data Structure) for the dictionary strings. Having a look at the fourth line of the green part (the WCF Channels Message part) and ignore the first 3 bytes (this is the "bla" part), we reach byte 0x44.

WCF Channels message

  • 0x44 – PrefixDictionaryElementA, tells us to look for a string in the dictionary
  • 0x0C – To (from the dictionary strings)
  • 0x1E – Identifier
  • 0x00 – mustUnderstand
  • 0x82 – OneText record
  • 0x99 – Chars8TextWithEndElement

The next byte is the one we're interested in. This is the size of the following string which is the address of the MSMQ queue. Now that we've got a general idea of what is going on inside the raw bytes of the message, we can alter it. These are the steps that we need to take:

  1. Load the original message from the outgoing queue.
  2. Alter both the framing and the WCF channels message addresses to the new queue address.
  3. Resend the message to the new destination with the altered message.
  4. Remove the old message from the source queue.

Using the code

  1. Loading the message is a pretty easy task with the System.Messaging API. We'll use Peek first in order to keep the message as long as we don't succeed in sending the redirected message.
  2. C#
    private void readMessage()
    {
        try
        {                
            message = sourceQueue.Peek(TimeSpan.FromSeconds(TIMEOUT));
    
            if (message == null)
                throw new Exception("cannot peek message, queue might be empty");
        }
        catch (Exception ex)
        {
            throw new Exception("failed to read message from queue", ex);
        }
    }
  3. This step involves several actions:
    • We take the source address, prefix it with "net.msmq://", and do the same for the destination address.
    • We should also remember that each of those strings are prefixed with one byte which represents the string size, so we'll add the size byte.
    • We go over the original message bytes and look for the string "net.msmq://source_address", on each occurrence (there should be two: one for .NET framing and one for WCF Channels message). We replace it with "net.msmq://dest_address".
    • Put the altered data back in the message.
    C#
    private void alterMessage()
    {
        byte[] data = new byte[message.BodyStream.Length];
        byte[] source = getHost(PREFIX + sourceName);
        byte[] dest = getHost(PREFIX + destName);
        
        message.BodyStream.Read(data, 0, data.Length);
    
        int offset = find(data, source);
    
        if (offset == NOT_FOUND)
            throw new Exception("cannot find source name in message");
    
        // get the total string size from the prefix byte which holds the size
        byte sourceSize = Convert.ToByte(data[offset - 1]);
        byte destSize = Convert.ToByte(sourceSize - (source.Length - dest.Length));
    
        // prefix the source & dest with a byte that holds the string size
        source = prefixSize(source, sourceSize);
        dest = prefixSize(dest, destSize);
    
        data = replace(data, source, dest); // update framing message address
        data = replace(data, source, dest); // update wcf channels message address
    
        //put the altered data back into the message
        message.BodyStream = new MemoryStream(data);
    }
     
    private byte[] getHost(string name)
    {
        // name should be in format of ip\path, ie: 192.168.100.1\private$\test
        string host = name.Substring(0, name.IndexOf('\\'));
    
        return Encoding.UTF8.GetBytes(host);
    }
    
    private byte[] prefixSize(byte[] data, byte dataSize)
    {
        byte[] newData = new byte[data.Length + 1];
    
        newData[0] = dataSize;
        Buffer.BlockCopy(data, 0, newData, 1, data.Length);
    
        return newData;
    }
  4. For resending the message, we'll use the System.Messaging API again.
  5. C#
    private void sendMessage()
    {
        MessageQueue destQueue = new MessageQueue("FormatName:Direct=TCP:" + 
                          destName, QueueAccessMode.Send);
     
        destQueue.Send(message, MessageQueueTransactionType.Single);
            destQueue.Close();
    }
  6. Same goes for removing the message using the System.Messaging API. We'll just receive it in order to remove it.
  7. C#
    private void removeMessage()
    {
        sourceQueue.ReceiveByLookupId(message.LookupId);
    }

Additional notes

  • Beside the address of the queue, the path is also changeable but this is not covered in this code, it can be added quite easily.
  • If the outgoing queue is in ongoing processing, you should pause the queue and restart the MSMQ service.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)