Introduction
This library implements messaging for worker threads within a process. All threads communicate with each other via a single point of access, the message queue or mailbox. Messages are processed serially by the thread. The mailbox ensures messages are queued and not lost until the thread can get around to processing them. This is an asynchronous messaging system, so sending threads don't block. By declaring all the data and code for a thread inside of a class, there are no concurrency issues with the thread's data. I find this approach much cleaner and easier to maintain than sharing objects between threads and having to use lock statements peppered throughout the code with the inevitable deadlocks occurring.
History
In the past, I've used Operating Systems that provided message queues: VRTX-32 and OS/2. For some reason, Windows has never provided such an object.
The diagram above gives a quick overview of how this works:
- Some external thread sends a message to Thread 1.
- After completing the processing, Thread 1 sends Results 1 Message off to Thread 2.
- Thread 2 then performs some processing, and when it completes, it sends a Results 2 Message to Thread 3 and an Ack Message to Thread 1.
- Thread 3 may just update a database as a result of processing the Results 2 Message.
Internals of the Library
There are a couple of classes in this library, as follows:
- Message Queue
- Managed Thread
- Message Classes
- Thread Manager
The Message Queue
The message queue, or mailbox, allows any thread to send a message to and the owning thread to receive these messages. The message queue consists of three components: a Queue
, a ManualResetEvent
, and an object that is used for locking access to the other components.
The message queue has three methods: SendMessage()
for any thread to send a message to the thread owning the queue, GetMessage()
which is used only by the thread owning the queue and waits forever until a message arrives, and GetMessage(int waitTime)
which does the same as GetMessage()
, but will return after waitTime
milliseconds if no message is received. This would allow your thread to perform some periodic processing. Alternatively, you could create a timer that would send a message to your thread to perform some periodic function.
Implementation
internal class MessageQueue
{
private Queue<BaseMessage> msgQueue;
private ManualResetEvent messageArrivedEvent;
private object mutexObject;
private readonly string threadName;
public MessageQueue(string threadName)
{
msgQueue = new Queue<BaseMessage>();
messageArrivedEvent = new ManualResetEvent(false);
mutexObject = new object();
this.threadName = threadName;
}
public void SendMessage(BaseMessage msg)
{
lock(mutexObject)
{
msgQueue.Enqueue(msg);
messageArrivedEvent.Set();
}
}
public BaseMessage GetMessage()
{
BaseMessage msg = null;
if(msgQueue.Count == 0)
messageArrivedEvent.WaitOne();
lock (mutexObject)
{
msg = msgQueue.Dequeue();
messageArrivedEvent.Reset();
}
return msg;
}
public BaseMessage GetMessage(int waitTime)
{
BaseMessage msg = null;
if (msgQueue.Count > 0 || messageArrivedEvent.WaitOne(waitTime) == true)
{
lock (mutexObject)
{
msg = msgQueue.Dequeue();
messageArrivedEvent.Reset();
}
}
return msg;
}
The Managed Thread
This class wraps the functionality of a thread from the thread pool and a message queue. It implements the IThread
interface, which allows manipulating the thread and sending it messages.
This class provides public methods that perform the following:
- Start the thread running after creating it.
- Stop the thread.
- Send messages to the thread.
public interface IThread
{
string Name { get; }
void StartThread();
void StopThread();
void SendMessage(BaseMessage msg);
}
During construction, a reference to a message handling function is provided. This is where you would implement processing of your specific messages.
Once the managed thread starts running, it enters a loop where it waits at the message queue for the next message to process. The stop thread message causes the thread to exit the loop.
private void mainLoop(object state)
{
Thread.CurrentThread.Name = name;
bool run = true;
while (run == true)
{
BaseMessage msg = null;
if (waitTimeInMilliseconds == -1)
msg = msgQueue.GetMessage();
else
msg = msgQueue.GetMessage(waitTimeInMilliseconds);
if (msg != null)
{
switch (msg.MsgType)
{
case MessageType.StopThread:
run = false;
break;
default:
messageHandler(msg);
break;
}
}
else
messageHandler(msg);
}
}
Message Classes
All useful messages inherit from BaseMessage
. This class defines the specific message types you want for your design.
public enum MessageType
{
StopThread,
Message1,
Message2A,
Message2B
}
public abstract partial class BaseMessage
{
public readonly MessageType MsgType;
public BaseMessage(MessageType msgType)
{
this.MsgType = msgType;
}
}
For this example program, I've created three specific message classes.
This is the message passed around for the loop test:
public class Message1 : BaseMessage
{
public readonly int DelayInMilliSeconds;
public Message1(int delayInMilliSeconds)
: base(MessageType.Message1)
{
DelayInMilliSeconds = delayInMilliSeconds;
}
}
This is the message that starts the load and sequence test:
public class Message2A : BaseMessage
{
public Message2A()
: base(MessageType.Message2A)
{
}
}
This is the message sent back for the load and sequence test:
public class Message2B : BaseMessage
{
public readonly int SequenceNumber;
public readonly string ThreadName;
public Message2B(int sequenceNumber, string threadName)
: base(MessageType.Message2B)
{
SequenceNumber = sequenceNumber;
ThreadName = threadName;
}
The Thread Manager
This class is the connection point for your application code. It implements the IThreadManager
interface.
public interface IThreadManager
{
IThread CreateThread(string threadName, Action<BaseMessage> messageHandler);
IThread CreateThread(
string threadName,
Action<BaseMessage> messageHandler,
int waitTimeInMilliseconds);
List<IThread> GetThreads();
void StartAllThreads();
void StopAllThreads();
void ClearAllThreads();
void SendMessage(BaseMessage msg);
event Action<BaseMessage> ReceivedMessage;
}
This allows your application code to:
- Obtain a reference to a
ThreadManager
instance. - Create any number of threads for your system.
- Start and stop all threads running.
- Obtain an
IThread
list of all the current threads. - Send a message to any of those threads.
- By wiring in an event, receive messages from the threads.
Using the Library
I haven't found a clean way of adding to the message enumerations and classes without touching the message class code, so currently, the library source code has to be included in your solution and custom changes made for your specific application.
I've included a WinForms app that I use to test the library, and demos it also. There are two tests I've written.
Loop Test
In this test, the WinForms app creates three threads. The WinForms thread sends Message 1 to thread 1, which then sleeps for the specified time before passing the message on to thread 2. Thread 2 sleeps for the specified time and passes the message on to thread 3, which also sleeps for the specified time before finally sending the message back to the WinForms thread, completing the test.
Creating and Starting the Threads
The following code from Form1::btnStartSleepLoop_Click()
initiates the test.
Example1
is a class that provides the custom message handler needed by the Managed Thread class. Note that worker3
is passed a reference to the WinForms mailbox, where it will send Message 1 when it completes. The managed thread is then created with a reference to the message handler implemented in worker3
. Ditto for threads 2 and 1. Next, the WinForms thread wires in the ReceivedMessage
event to receive the message from thread 3 when it completes. All the managed threads are then started, and thread 1 is sent Message 1 to start the test.
threadMgr = ThreadManager.GetThreadManager();
Example1 worker3 = new Example1(
"Thread3",
threadMgr.SendMessage);
IThread thread3 = threadMgr.CreateThread(worker3.ThreadName, worker3.MessageHandler);
threads.Add(worker3.ThreadName, thread3);
Example1 worker2 = new Example1(
"Thread2",
thread3.SendMessage);
IThread thread2 = threadMgr.CreateThread(worker2.ThreadName, worker2.MessageHandler);
threads.Add(worker2.ThreadName, thread2);
Example1 worker1 = new Example1(
"Thread1",
thread2.SendMessage);
IThread thread1 = threadMgr.CreateThread(worker1.ThreadName, worker1.MessageHandler);
threads.Add(worker1.ThreadName, thread1);
threadMgr.ReceivedMessage += new Action<BaseMessage>(threadMgr_NewMessage);
threadMgr.StartAllThreads();
Message1 msg = new Message1(Convert.ToInt32(txtDelay.Text) * 1000);
thread1.SendMessage(msg);
Example1 Class
This class has a send message delegate where it will send the message on to when it is finished processing. A reference to MessageHandler()
is passed to the Managed Thread, which then calls this method every time a message is received.
public class Example1
{
public readonly string ThreadName;
private Action<BaseMessage> sendMsg;
public Example1(string threadName, Action<BaseMessage> sendMsg)
{
ThreadName = threadName;
this.sendMsg = sendMsg;
}
public void MessageHandler(BaseMessage msg)
{
switch (msg.MsgType)
{
case MessageType.Message1:
{
Message1 msg1 = (Message1)msg;
System.Threading.Thread.Sleep(msg1.DelayInMilliSeconds);
sendMsg(msg1);
}
break;
}
}
When thread 3 completes, the WinForms thread receives the message in threadMgr_NewMessage()
. The method loopTestUpdateForm()
handles the Invoke required to get back on the UI thread and update the controls on the form.
void threadMgr_NewMessage(BaseMessage msg)
{
if (msg != null)
{
switch (msg.MsgType)
{
case MessageType.Message1:
threadMgr.ReceivedMessage -= threadMgr_NewMessage;
threadMgr.StopAllThreads();
threadMgr.ClearAllThreads();
threads.Clear();
loopTestUpdateForm();
break;
}
}
}
Load and Sequence Test
This test is designed to stress the system and detect missing messages and messages received out of sequence. Message 2A is sent to the three threads with the requested number of Message 2Bs to be sent back. Each thread sends back the required number of messages as fast as possible in a loop, with an ascending and continuous sequence number starting from 0. This allows the WinForms thread to check that not only were the expected number of messages received, but that they arrived in order.
Starting the Test
Three threads are created as before, except that this time, Example2
objects implement the message handlers, and they all send messages back to the WinForms thread.
Example2 Class
public class Example2
{
public readonly string ThreadName;
private int messageSequenceNumber;
private int numberOfMessagesToSend;
private Action<BaseMessage> sendMsg;
public Example2(string threadName, int numberOfMessagesToSend,
Action<BaseMessage> sendMsg)
{
ThreadName = threadName;
messageSequenceNumber = 0;
this.numberOfMessagesToSend = numberOfMessagesToSend;
this.sendMsg = sendMsg;
}
public void MessageHandler(BaseMessage msg)
{
switch (msg.MsgType)
{
case MessageType.Message2A:
{
while (messageSequenceNumber < numberOfMessagesToSend)
{
Message2B outboundMsg =
new Message2B(messageSequenceNumber, ThreadName);
sendMsg(outboundMsg);
messageSequenceNumber++;
}
}
break;
}
}
Points of Interest
I've implemented this messaging scheme on an ad-hoc basis many times for various customers, and finally decided to package it and make it more reusable for future projects.
One important caveat is that when passing references around between threads, the threads will still have asynchronous access to the same object. The partitioning from a good multi-threaded design will eliminate this problem.
History
- 05/15/2010 - First version.