Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / programming / threads

Implementing a Thread Safe Message Queue in C#

4.76/5 (12 votes)
14 Jul 2020MIT5 min read 46.4K   785  
This article explores a message passing mechanism for safe thread to thread communication
In this article, we explore a thread safe mechanism for communicating between two or more threads in a running process.

Introduction

Multi-threading is difficult. There's no way around it. However, there are certain techniques that are solid and make it easier to work with multi-threaded applications. One technique for communicating between two or more threads safely is a message queue. Another is to use the Windows message queue that all windows have available. We'll be diving into both techniques here.

Update: Added windowed receiver technique

Conceptualizing this Mess

Thread to thread communication is tricky because you can't safely access data shared between threads without some sort of synchronization mechanism. There are plenty of options available to choose from, and one such option is a message queue.

A message queue allows a sender to post messages which another thread then picks up and responds to. The posting of the message and the reading of the message is thread safe. This way, you can communicate with other threads by sending messages to the queue.

The sender's job is to pass command messages to other threads. This is a relatively simple operation. We add a message to a thread safe queue (represented by ConcurrentQueue<T>) and a mechanism for efficiently notifying the other thread when messages become available. The latter facility is provided by a SemaphoreSlim. It is possible to run the thread without using a semaphore at all, but this will increase CPU usage. However, it might be appropriate for your needs depending on what sort of work you're doing in the thread.

The receiver's job is to spin a loop looking for messages. Inside the loop, we wait on the semaphore, and then switch/case over the messages, performing the appropriate action.

The message queue will keep storing messages until a thread wakes up and receives them. Consequently, the queue allows for a backlog of messages. This is important in case a thread can't respond to messages fast enough. Luckily, our concurrent queue easily enables this.

If you need bidirectional communication, you'll have to create an additional queue and an additional semaphore and use it going the other direction. In this situation, each thread acts as both sender and receiver.

One caveat to this is the receiver really can't be the main application thread in a Windows Forms app using this technique. The reason is that Windows Forms already spins an application loop on the main thread, and you don't have direct access to it. That's what Application.Run() does.

There's another way to do communication with a windowed thread by passing window messages. In doing so, it works a lot like this method, but you must subclass a native window. It's a bit limited, admittedly, in terms of accepting parameters as it only takes two integer values but you can always use Control.Invoke() to do something similar. Basically, you process custom window messages and rely on Windows to keep a synchronized message queue. We can access it via a NativeWindow's WndProc() callback and the PostMessage() method which we must use P/Invoke for.

Coding this Mess

The Windowless Receiver

As multi-threaded applications go, this one is pretty easy. First, we'll cover the relevant member variables:

C#
// these two members constitute our message queue
ConcurrentQueue<Message> _messages = new ConcurrentQueue<Message>();
SemaphoreSlim _messagesAvailable = new SemaphoreSlim(0);

The _messages queue holds our pending messages, and the _messagesAvailable semaphore is used to signal when one or more messages are waiting to be processed.

We've also defined constants for the various messages:

C#
const int MSG_STOP = 0;
const int MSG_RESET = 1;
const int MSG_INCREMENT = 2;
const int MSG_DECREMENT = 3;

Next let's look at how we pass a message from a transmitting thread:

C#
// pass the increment message
_messages.Enqueue(new Message(MSG_INCREMENT, null));
// signal messages available
_messagesAvailable.Release(1);

Here, we have two steps. The first is to enqueue the message. The first parameter to Message (an alias for KeyValuePair<int,object[]>) is the message id, and the second parameter is an array of arguments for the message. We don't use the second parameter - the argument list - in the demo. The second step is to call Release() on the semaphore to signal a message is available.

Now let's take a look at the receiver thread:

C#
var thread = new Thread(() => {
    var done = false;
    while(!done)
    {
        // wait until a message becomes available
        _messagesAvailable.Wait();
        Message msg;
        // process messages
        // we use Try here because we're multithreaded
        // so it's possible that between the Wait() call
        // and the dequeue call the queue may be cleared
        if (_messages.TryDequeue(out msg))
        {
            switch(msg.Key)
            {
                case MSG_STOP: // stop
                    done = true;
                    break;
                case MSG_RESET: // reset
                    _counter = 0;
                    break;
                case MSG_INCREMENT: // +
                    ++_counter;
                    break;
                case MSG_DECREMENT: // -
                    --_counter;
                    break;
            }
            // signal the main thread to update the counter display
            PostMessage(_uiReceiver.Handle,WM_USER,_counter,0);
            // an alternative that doesn't require all the window
            // messaging stuff is:
            // Invoke(new Action(delegate 
            //       { CounterLabel.Text = "Count: " + _counter.ToString(); }));
        }
    }
});
// now that we've created the thread
// stash it so it doesn't go out of scope
_receiver = thread;
thread.Start();

Note that we could have used Control.Invoke() to safely update the display. That's just a quick and dirty way to pass something back down to the main thread. However, we're using our windowed message passing technique accessed through PostMessage(). We'll explore that in a bit.

Notice the key here is spinning a loop, waiting on the semaphore and then trying to dequeue a message before switching on the message id to figure out what to do.

That's all there is to it! Now you can safely communicate between threads using this technique. There's one more technique to explore though.

The Windowed Receiver

This method is necessary if you're trying to get messages on the main UI thread in a Windows Forms app, for the reasons explained before. What we do is we subclass a NativeWindow and then use PostMessage() to communicate with it, letting Windows itself handle the synchronization.

Here is the P/Invoke declaration and constant we need:

C#
[DllImport("user32.dll")]
static extern bool PostMessage(IntPtr hWnd, uint Msg, int wParam, int lParam);
const int WM_USER = 0x0400;

The WM_USER constant is what we must use for custom window messages to avoid collision with system message ids. You can use WM_USER, WM_USER+1, WM_USER+2, all the way up to WM_USER+0x7FFF. Meanwhile, PostMessage() allows us to asynchronously send a message to a window.

Here is the NativeWindow subclasser:

C#
// this class handles our native window receiver 
// incoming messages
private class _NativeReceiver : NativeWindow
{
    Main _main;
    public _NativeReceiver(Main main)
    {
        _main = main;
    }
    protected override void WndProc(ref System.Windows.Forms.Message m)
    {
        switch(m.Msg)
        {
            case WM_USER:
                // update the UI
                _main.CounterLabel.Text = "Count: "+ m.WParam.ToInt32();
                break;
        }
                
        base.WndProc(ref m);
    }
}

All we're doing is waiting for the WM_USER message and then setting the counter label to the value of the message parameter WParam. All of this happens on the UI thread. Note that WParam and LParam are IntPtrs here but in PostMessage() they're ints. That's okay, just convert the IntPtrs to ints.

One more thing to cover is the setup of the receiver which we perform in the main form's constructor:

C#
// we must create a native window in
// order to receive custom window messages
// the thread it operates on is always 
// the thread it was created on. Here
// it's the UI thread.
_uiReceiver = new _NativeReceiver(this);
// make sure the handle is created 
// because we need it to subclass
var p = new CreateParams();
_uiReceiver.CreateHandle(p);

Finally, the calls to PostMessage() are quite simple, but the downside is that they only take two integer parameters. Fortunately, we don't need more than that, but if you did, you'd have to get clever. Anyway, here's the PostMessage() call:

C#
// signal the main thread to update the counter display
PostMessage(_uiReceiver.Handle,WM_USER,_counter,0);

You probably noticed it earlier being called from our windowless receiver. You can see we're using the first integer parameter and that's it. This is what sends the message to _NativeReceiver.WndProc().

There you have it! You now have two techniques for inter thread communication.

The Demo Application

The demo has 4 buttons: Start/Stop, Reset, Increment (+), and Decrement (-). The processing of messages doesn't happen until Start is clicked. Reset resets the counter. Increment (+) and Decrement (-) increase or decrease the counter by one, respectively. You can queue up messages by stopping the processing, and then start it with Start at which point all pending messages are processed. Each of these requires bidirectional thread to thread communication to perform.

History

  • 14th July, 2020 - Initial submission
  • 14th July, 2020 - Added windowed receiver

License

This article, along with any associated source code and files, is licensed under The MIT License