Introduction
Let's assume that we need to interface with 3rd party system through MSMQ by sending messages and receiving response back. Let's also assume that we need to do so within single execution context, meaning we would like to do some work locally, then send a message to 3rd party module via MSMQ, receive a response, and continue local processing. This can be achieved using .NET's async
/await
pattern together with MSMQ's MessageQueue.ReceiveByCorrelationId
method - after sending a message, we can remember its ID and wait for the resulting message by calling ReceiveByCorrelationId
and passing that ID as an argument.
In this article, I will demonstrate two different approaches of retrieving messages from MSMQ by Correlation ID. First approach, I call it "Remote Sync", is a standard way suggested by MSMQ API via MessageQueue.ReceiveByCorrelationId
method. The second approach, I call it "Local Sync", is custom implementation of ReceiveByCorrelationId
that internally uses MSMQ's MessageQueue.Receive
method and resolves message correlation locally.
In this article, I will not cover the basics of MSMQ, assuming that you are already familiar with the main concepts of Message Oriented Design. There are plenty of good articles regarding the topic on Code Project. In addition, the following article describes how to correlate request/response messages by using System.Messaging.
Why?
In general, when designing Message Oriented System, you should avoid using synchronous Send
/ReceiveByCorrelationId
technique in favor of using an asynchronous Send
/Receive
. It may sound simple, though, it can actually change the whole architecture dramatically, because you would have to break your execution context apart and have Sender and Receiver as separate independent units. It leads to the fundamental question: in the application, when user clicks a "Calc
" button, do you block current thread while waiting for response, or, setup a callback that would fire upon receiving response, and let user continue to work with the app? The first approach is easy, but less scalable (blocking UI), while the second one is more difficult to implement, though it is more user-friendly and better scalable (responsive UI).
It is a known fact that ReceiveByCorrelationId
method (same applies to ReceiveByLookupID
and ReceiveById
) is a performance killer when you have many parallel threads waiting for response and big number of messages being pushed through MSMQ. This is due to the nature of how correlation resolution is implemented internally in that method. Under the hood, it uses a Cursor
that would Peek
all the messages in the queue till it finds the message with matching Correlation ID. When it finds the message, it would call Receive
method to receive it. When there are too many threads constantly iterating over the cursor, performance goes down very quickly. According to Microsoft:
Important Note: ReceiveByCorrelationId
uses sequence search to find a message in a queue. Use it with carefulness; it might be inefficient when a high number of messages reside in a queue.
In the alternative approach, we will maintain internal Dictionary
of all message IDs that we send, and for every message picked from the queue, have the receiving thread look for the matching IDs in that Dictionary
. The downside of this approach is that the information about all messages being processed is stored in-memory, and can be lost if the server goes down.
Demo Application
The demo app is a console application accepting two possible arguments: -l
(Local Sync) and -r
(Remote Sync). If no arguments were specified, Local Sync will be used by default.
There are several constants hard-coded in the app that you can alter if needed (see private constants in Program.cs):
InputQueue
, OutputQueue
: The names of demo input and output queues TimeoutSeconds
: Timeout in seconds of wait time for receiving response. Note that in Remote Sync, the timeout is used for every message as defined; in Local Sync, since messages are being sent in bulk, the timeout is multiplied by the number of messages being sent at once. MaxItems
: The number of messages used in the demo (50,000 by default) MaxBuffer
: The number of messages being sent to MSQM at once in Local Sync scenario UseLocalSyncByDefault
: Flag indicating whether to use Local Sync demo by default
Depending on current Processor Type, the app will instantiate either MsmqSyncLocal
, for Local Sync Demo, or MsmqSyncRemote
, for Remote Sync Demo. Both of them have asynchronous ProcessAsync
method accepting string
data along with cancellation token, and returning Task<string>
. This method is supposed to send data to an input queue, wait, and receive the result back.
As a next step, the app will run several (per number of CPU cores) worker threads emulating this 3rd party processor I mentioned earlier. The job of each Worker
is to continuously retrieve incoming messages from an input queue, convert them to integers, calculate the square root, and send the result to an output queue. The code of worker method is shown below:
private static async Task RunWorkerAsync(int workerIndex, CancellationToken cancellationToken)
{
var inputQueue = new MessageQueue(string.Format(@".\private$\{0}", InputQueue),
QueueAccessMode.Receive)
{
Formatter = new ActiveXMessageFormatter(),
MessageReadPropertyFilter = {Id = true, Body = true}
};
var outputQueue = new MessageQueue(string.Format(@".\private$\{0}", OutputQueue),
QueueAccessMode.Send)
{
Formatter = new ActiveXMessageFormatter()
};
while (!cancellationToken.IsCancellationRequested)
{
var message = await inputQueue.ReceiveAsync(cancellationToken);
var data = (string) message.Body;
try
{
var intData = int.Parse(data);
var result = Math.Sqrt(intData).ToString(CultureInfo.InvariantCulture);
outputQueue.Send(new Message(result, new ActiveXMessageFormatter())
{
CorrelationId = message.Id,
Label = string.Format("Worker {0}", workerIndex)
});
}
catch (Exception ex)
{
outputQueue.Send(new Message(ex.ToString(), new ActiveXMessageFormatter())
{
CorrelationId = message.Id,
Label = string.Format("ERROR: Worker {0}", workerIndex)
});
}
}
}
Remote Sync
As mentioned before, Remote Sync Processor uses ReceiveByCorrelationId
method to retrieve messages from output queue. Here is the code that demonstrates it:
public async Task<string> ProcessAsync(string data, CancellationToken ct)
{
var inputQueue = new MessageQueue(string.Format(@".\private$\{0}", m_inputQueue),
QueueAccessMode.Send)
{
Formatter = ActiveXFormatter
};
var outputQueue = new MessageQueue(string.Format(@".\private$\{0}", m_outputQueue),
QueueAccessMode.Receive)
{
Formatter = ActiveXFormatter,
MessageReadPropertyFilter = {Id = true, CorrelationId = true, Body = true, Label = true}
};
var message = new Message(data, ActiveXFormatter);
inputQueue.Send(message);
var id = message.Id;
try
{
var resultMessage = await outputQueue.ReceiveByCorrelationIdAsync(id, ct);
var label = resultMessage.Label;
var result = (string) resultMessage.Body;
if (m_exceptionHandler != null && !string.IsNullOrEmpty(label) && label.Contains("ERROR"))
throw m_exceptionHandler(data);
return result;
}
catch (Exception)
{
if (!ct.IsCancellationRequested)
throw;
return null;
}
}
Note that I am not using ReceiveByCorrelationId
directly, but via an extension method called ReceiveByCorrelationIdAsync
. This is due to the fact that original ReceiveByCorrelationId
method does not properly work with timeouts and does not support cancellation tokens.
Local Sync
In the Local Sync scenario, the ProcessAsync
method has identical signature. However, this time, it is truly asynchronous:
public async Task<string> ProcessAsync(string data, CancellationToken ct)
{
await m_semaphore.WaitAsync(ct);
var tcs = new TaskCompletionSource<string>();
var message = new Message(data, ActiveXFormatter);
m_inputQueue.Send(message);
var id = message.Id;
m_items.TryAdd(id, tcs);
var tcsForBag = new TaskCompletionSource<bool>();
if (!m_bag.TryAdd(id, tcsForBag))
m_bag[id].TrySetResult(true);
var task = await Task.WhenAny(Task.Delay(m_timeout, ct), tcs.Task);
if (task != tcs.Task)
{
if (m_items.TryRemove(id, out tcs))
{
m_semaphore.Release();
if (ct.IsCancellationRequested)
tcs.TrySetCanceled();
else
tcs.TrySetException(new TimeoutException(string.Format
("Timeout waiting for a message on queue [{0}]", m_outputQueue.QueueName)));
}
}
return await tcs.Task;
}
An instance of MsmqSyncLocal
class internally maintains a SemaphoreSlim
that controls the number of messages we send to MSMQ at once. This number can be configured by MaxItems
constant.
When ProcessAsync
is called, it creates a Message
and immediately sends it to the input queue. Then it creates a TaskCompletionSource
object, whose Task
property we will asynchronously return back to the caller. This completion source object as well as the message ID that we sent earlier are added to an internal ConcurrentDictionary
called m_items
.
Another method ReceiveMessagesAsync
, that is being executed from constructor, runs an infinite loop retrieving messages from the output queue. For every message received, it checks its CorrelationId
in the m_items
dictionary. If the match is found, it would signal the TaskCompletionSource
associated with it setting either Result, if successful, or Cancellation/Error.
private async Task ReceiveMessagesAsync()
{
while (!m_stopToken.IsCancellationRequested)
{
var message = await m_outputQueue.ReceiveAsync(m_stopToken.Token);
var id = message.CorrelationId;
var label = message.Label;
var data = (string) message.Body;
LogErrors(Task.Run(async () =>
{
TaskCompletionSource<string> tcs;
var tcsForBag = new TaskCompletionSource<bool>();
if (m_bag.TryAdd(id, tcsForBag))
await m_bag[id].Task;
m_bag.TryRemove(id, out tcsForBag);
if (m_items.TryRemove(id, out tcs))
{
m_semaphore.Release();
if (m_exceptionHandler != null && !string.IsNullOrEmpty(label) &&
label.Contains("ERROR"))
tcs.TrySetException(m_exceptionHandler(data));
else
tcs.TrySetResult(data);
}
}));
}
}
You may have noticed that there is one more ConcurrentDictionary
involved (m_bag
). It solves the problem of condition racing that may occur in case if ReceiveMessagesAsync
receives a message before its ID has been added to m_items
. If that happens, we won't be able to find expected correlation ID in the dictionary.
In fact, if we knew the Message ID
prior to sending it, we could have simply populated m_items
prior to sending the message, hence avoiding the race condition. However, this is not possible in MSMQ.
So, in order to coordinate message ID insertion, ProcessAsync
would attempt to insert a TaskCompletionSource
object along with that ID into the m_bag
. If successful, we don't have to do anything else because that element would be removed by ReceiveMessagesAsync
. Otherwise, if that ID already exists in the m_bag
, which means that it has already been added by ReceiveMessagesAsync
, we would signal its completion in order to let receiving process know that it can continue execution.
Summary
My testing indicated that Remote Sync is taking approximately 65 seconds as opposed to Local Sync taking around 5 seconds in the same environment.
Being more than 10 times slower than it could, ReceiveByCorrelationId
method should be avoided as much as possible. If asynchronous workflow does not fit into your application architecture and performance is a concern, I believe local correlation is something to consider as an option.