Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / XML

Efficient Message Correlation in MSMQ

5.00/5 (5 votes)
7 Dec 2018CPOL6 min read 23K   664  
This article describes more scalable implementation of MessageQueue.ReceiveByCorrelationId method.

Introduction

Let's assume that we need to interface with 3rd party system through MSMQ by sending messages and receiving response back. Let's also assume that we need to do so within single execution context, meaning we would like to do some work locally, then send a message to 3rd party module via MSMQ, receive a response, and continue local processing. This can be achieved using .NET's async/await pattern together with MSMQ's MessageQueue.ReceiveByCorrelationId method - after sending a message, we can remember its ID and wait for the resulting message by calling ReceiveByCorrelationId and passing that ID as an argument.

In this article, I will demonstrate two different approaches of retrieving messages from MSMQ by Correlation ID. First approach, I call it "Remote Sync", is a standard way suggested by MSMQ API via MessageQueue.ReceiveByCorrelationId method. The second approach, I call it "Local Sync", is custom implementation of ReceiveByCorrelationId that internally uses MSMQ's MessageQueue.Receive method and resolves message correlation locally.

In this article, I will not cover the basics of MSMQ, assuming that you are already familiar with the main concepts of Message Oriented Design. There are plenty of good articles regarding the topic on Code Project. In addition, the following article describes how to correlate request/response messages by using System.Messaging.

Why?

In general, when designing Message Oriented System, you should avoid using synchronous Send/ReceiveByCorrelationId technique in favor of using an asynchronous Send/Receive. It may sound simple, though, it can actually change the whole architecture dramatically, because you would have to break your execution context apart and have Sender and Receiver as separate independent units. It leads to the fundamental question: in the application, when user clicks a "Calc" button, do you block current thread while waiting for response, or, setup a callback that would fire upon receiving response, and let user continue to work with the app? The first approach is easy, but less scalable (blocking UI), while the second one is more difficult to implement, though it is more user-friendly and better scalable (responsive UI).

It is a known fact that ReceiveByCorrelationId method (same applies to ReceiveByLookupID and ReceiveById) is a performance killer when you have many parallel threads waiting for response and big number of messages being pushed through MSMQ. This is due to the nature of how correlation resolution is implemented internally in that method. Under the hood, it uses a Cursor that would Peek all the messages in the queue till it finds the message with matching Correlation ID. When it finds the message, it would call Receive method to receive it. When there are too many threads constantly iterating over the cursor, performance goes down very quickly. According to Microsoft:

Important Note: ReceiveByCorrelationId uses sequence search to find a message in a queue. Use it with carefulness; it might be inefficient when a high number of messages reside in a queue.

In the alternative approach, we will maintain internal Dictionary of all message IDs that we send, and for every message picked from the queue, have the receiving thread look for the matching IDs in that Dictionary. The downside of this approach is that the information about all messages being processed is stored in-memory, and can be lost if the server goes down.

Demo Application

The demo app is a console application accepting two possible arguments: -l (Local Sync) and -r (Remote Sync). If no arguments were specified, Local Sync will be used by default.

There are several constants hard-coded in the app that you can alter if needed (see private constants in Program.cs):

  • InputQueue, OutputQueue: The names of demo input and output queues
  • TimeoutSeconds: Timeout in seconds of wait time for receiving response. Note that in Remote Sync, the timeout is used for every message as defined; in Local Sync, since messages are being sent in bulk, the timeout is multiplied by the number of messages being sent at once.
  • MaxItems: The number of messages used in the demo (50,000 by default)
  • MaxBuffer: The number of messages being sent to MSQM at once in Local Sync scenario
  • UseLocalSyncByDefault: Flag indicating whether to use Local Sync demo by default

Depending on current Processor Type, the app will instantiate either MsmqSyncLocal, for Local Sync Demo, or MsmqSyncRemote, for Remote Sync Demo. Both of them have asynchronous ProcessAsync method accepting string data along with cancellation token, and returning Task<string>. This method is supposed to send data to an input queue, wait, and receive the result back.

As a next step, the app will run several (per number of CPU cores) worker threads emulating this 3rd party processor I mentioned earlier. The job of each Worker is to continuously retrieve incoming messages from an input queue, convert them to integers, calculate the square root, and send the result to an output queue. The code of worker method is shown below:

C#
private static async Task RunWorkerAsync(int workerIndex, CancellationToken cancellationToken)
{
    var inputQueue = new MessageQueue(string.Format(@".\private$\{0}", InputQueue), 
                     QueueAccessMode.Receive)
    {
        Formatter = new ActiveXMessageFormatter(),
        MessageReadPropertyFilter = {Id = true, Body = true}
    };

    var outputQueue = new MessageQueue(string.Format(@".\private$\{0}", OutputQueue), 
                      QueueAccessMode.Send)
    {
        Formatter = new ActiveXMessageFormatter()
    };

    while (!cancellationToken.IsCancellationRequested)
    {
        var message = await inputQueue.ReceiveAsync(cancellationToken);
        var data = (string) message.Body;

        try
        {
            // Process Data
            var intData = int.Parse(data);
            var result = Math.Sqrt(intData).ToString(CultureInfo.InvariantCulture);
            outputQueue.Send(new Message(result, new ActiveXMessageFormatter())
            {
                CorrelationId = message.Id,
                Label = string.Format("Worker {0}", workerIndex)
            });
        }
        catch (Exception ex)
        {
            outputQueue.Send(new Message(ex.ToString(), new ActiveXMessageFormatter())
            {
                CorrelationId = message.Id,
                Label = string.Format("ERROR: Worker {0}", workerIndex)
            });
        }
    }
}

Remote Sync

As mentioned before, Remote Sync Processor uses ReceiveByCorrelationId method to retrieve messages from output queue. Here is the code that demonstrates it:

C#
public async Task<string> ProcessAsync(string data, CancellationToken ct)
{
    var inputQueue = new MessageQueue(string.Format(@".\private$\{0}", m_inputQueue), 
                     QueueAccessMode.Send)
    {
        Formatter = ActiveXFormatter
    };

    var outputQueue = new MessageQueue(string.Format(@".\private$\{0}", m_outputQueue), 
                      QueueAccessMode.Receive)
    {
        Formatter = ActiveXFormatter,
        MessageReadPropertyFilter = {Id = true, CorrelationId = true, Body = true, Label = true}
    };

    var message = new Message(data, ActiveXFormatter);
    inputQueue.Send(message);
    var id = message.Id;

    try
    {
        //var resultMessage = outputQueue.ReceiveByCorrelationId(id, m_timeout);
        var resultMessage = await outputQueue.ReceiveByCorrelationIdAsync(id, ct);

        var label = resultMessage.Label;
        var result = (string) resultMessage.Body;

        if (m_exceptionHandler != null && !string.IsNullOrEmpty(label) && label.Contains("ERROR"))
            throw m_exceptionHandler(data);

        return result;
    }
    catch (Exception)
    {
        if (!ct.IsCancellationRequested)
            throw;
        return null;
    }
}

Note that I am not using ReceiveByCorrelationId directly, but via an extension method called ReceiveByCorrelationIdAsync. This is due to the fact that original ReceiveByCorrelationId method does not properly work with timeouts and does not support cancellation tokens.

Local Sync

In the Local Sync scenario, the ProcessAsync method has identical signature. However, this time, it is truly asynchronous:

C#
public async Task<string> ProcessAsync(string data, CancellationToken ct)
{
    await m_semaphore.WaitAsync(ct);

    var tcs = new TaskCompletionSource<string>();
    var message = new Message(data, ActiveXFormatter);

    m_inputQueue.Send(message);

    var id = message.Id;

    m_items.TryAdd(id, tcs);

    var tcsForBag = new TaskCompletionSource<bool>();
    if (!m_bag.TryAdd(id, tcsForBag))
        m_bag[id].TrySetResult(true);

    var task = await Task.WhenAny(Task.Delay(m_timeout, ct), tcs.Task);

    if (task != tcs.Task)
    {
        if (m_items.TryRemove(id, out tcs))
        {
            m_semaphore.Release();
            if (ct.IsCancellationRequested)
                tcs.TrySetCanceled();
            else
                tcs.TrySetException(new TimeoutException(string.Format
                ("Timeout waiting for a message on queue [{0}]", m_outputQueue.QueueName)));
        }
    }

    return await tcs.Task;
}

An instance of MsmqSyncLocal class internally maintains a SemaphoreSlim that controls the number of messages we send to MSMQ at once. This number can be configured by MaxItems constant.

When ProcessAsync is called, it creates a Message and immediately sends it to the input queue. Then it creates a TaskCompletionSource object, whose Task property we will asynchronously return back to the caller. This completion source object as well as the message ID that we sent earlier are added to an internal ConcurrentDictionary called m_items.

Another method ReceiveMessagesAsync, that is being executed from constructor, runs an infinite loop retrieving messages from the output queue. For every message received, it checks its CorrelationId in the m_items dictionary. If the match is found, it would signal the TaskCompletionSource associated with it setting either Result, if successful, or Cancellation/Error.

C#
private async Task ReceiveMessagesAsync()
{
    while (!m_stopToken.IsCancellationRequested)
    {
        var message = await m_outputQueue.ReceiveAsync(m_stopToken.Token);
        var id = message.CorrelationId;
        var label = message.Label;
        var data = (string) message.Body;

        LogErrors(Task.Run(async () =>
        {
            TaskCompletionSource<string> tcs;

            var tcsForBag = new TaskCompletionSource<bool>();
            if (m_bag.TryAdd(id, tcsForBag))
                await m_bag[id].Task;
            m_bag.TryRemove(id, out tcsForBag);

            if (m_items.TryRemove(id, out tcs))
            {
                m_semaphore.Release();
                if (m_exceptionHandler != null && !string.IsNullOrEmpty(label) && 
                                                  label.Contains("ERROR"))
                    tcs.TrySetException(m_exceptionHandler(data));
                else
                    tcs.TrySetResult(data);
            }
        }));
    }
}

You may have noticed that there is one more ConcurrentDictionary involved (m_bag). It solves the problem of condition racing that may occur in case if ReceiveMessagesAsync receives a message before its ID has been added to m_items. If that happens, we won't be able to find expected correlation ID in the dictionary.

In fact, if we knew the Message ID prior to sending it, we could have simply populated m_items prior to sending the message, hence avoiding the race condition. However, this is not possible in MSMQ.

So, in order to coordinate message ID insertion, ProcessAsync would attempt to insert a TaskCompletionSource object along with that ID into the m_bag. If successful, we don't have to do anything else because that element would be removed by ReceiveMessagesAsync. Otherwise, if that ID already exists in the m_bag, which means that it has already been added by ReceiveMessagesAsync, we would signal its completion in order to let receiving process know that it can continue execution.

Summary

My testing indicated that Remote Sync is taking approximately 65 seconds as opposed to Local Sync taking around 5 seconds in the same environment.

Being more than 10 times slower than it could, ReceiveByCorrelationId method should be avoided as much as possible. If asynchronous workflow does not fit into your application architecture and performance is a concern, I believe local correlation is something to consider as an option.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)