This article addresses the question of how to get an event fired by Thread A to execute in the context of Thread B?
Many moons ago, I wrote the last Windows service I’d ever need. It has a pluggable architecture and today, there are over 20 modules doing all manner of business tasks such as sending emails, generating reports, performing automation, etc. Inside each plug-in module is a thread daemon which listens to events to orchestrate the operations of the other supporting module threads. This primarily consists of a thread which provides incoming data (what?), a thread which is the scheduler (when?), and a thread pool of worker threads which process said data (how?).
Recently, a new plug-in was created to enforce global trade compliance. A bug crept up in the new worker thread initialization which surprised me because it caused the incoming data queue thread to die even though that code hadn’t changed in years. The reason was that the incoming queue fired the event and therefore it was the thread which executed the delegate (aka function pointer). This function is defined in the thread daemon class which takes care of initializing and starting a worker thread. From an error handling point of view, this struck me as less than ideal. So the question was asked:
How do I get the thread daemon to be the thread which executes its own management functions which are triggered by events?
This got me thinking about WinForm apps whereby a worker thread must marshal execution of a delegate to the main UI thread to update the UI like changing the status bar. The pattern is to check if an invoke is required (am I the UI thread?) and if not, then you use the built in SynchronizationObject
to invoke the delegate in the UI thread.
public delegate void UpdateStatusEvent(string message, params object[] args);
public void UpdateStatus(string message, params object[] args)
{
if (_Status.InvokeRequired)
{
UpdateStatusEvent amc = new UpdateStatusEvent(UpdateStatus);
this.Invoke(amc, new object[] { message, args });
}
else
{
_LabelStatus.Text = string.Format(message, args);
_Status.Refresh();
}
}
With this pattern in my head, I thought I’d be able to implement this quite easily between my background worker threads. How wrong I was and hence this article! WinForms and WPF provide a synchronization context but there isn’t something in the framework you get for free in my case. From what I understand, they both implement a classic windows message pump which is used to marshal the execution from the worker thread to the UI thread. This will become important shortly, but suffice it to say after a lot of searching, a pre-baked, easy solution wasn’t on offer.
As I continued down this path, I searched for ISynchronizeInvoke
and IAsyncResult
which primarily returned questions and examples of the above pattern. I tried in vein to find an equivalent implementation except for worker threads. Surely, it wasn’t that complicated to get an event initiated in Thread A to execute in Thread B? I tried using the SynchronizationContext
class but quickly discovered it’s just a base class and didn’t do the work of marshalling the execution from Thread A to Thread B. So while I went through the motions, the code still executed in the wrong thread context (so why isn’t this class abstract if you must extend it to get it to work?). BTW, the way I was testing to see which thread was running was to refer to Thread.CurrentThread.ManagedThreadId
.
So now, I had wasted a lot of time trying to find an easy answer and had to accept some work on my part would be necessary in this scenario. What struck me was the notion of the classic windows pump being crucial for the synchronization context. Basically that it has a message queue running so a thread can enqueue a message and that message then be executed by another thread when it is able. So in the thread daemon, I defined a queue and in the OnDoWork
, created my rudimentary “message pump”.
private Queue<WaitCallback> _qWorkItems;
private object _oLock;
protected override void OnDoWork(DoWorkEventArgs e)
{
LogSvc.Debug(this, "Running");
while (!CancellationPending)
{
lock (_oLock)
{
while (_qWorkItems.Count > 0)
{
LogSvc.Debug(this, "Dequeue and invoke work item on thread {0}",
Thread.CurrentThread.ManagedThreadId);
var workItem = _qWorkItems.Dequeue();
workItem.Invoke(null);
}
Monitor.Wait(_oLock);
}
}
}
public void QueueWorkItem(WaitCallback callback)
{
LogSvc.Debug(this, "Enqueuing work item from event caller thread {0}",
Thread.CurrentThread.ManagedThreadId);
lock (_oLock)
{
_qWorkItems.Enqueue(callback);
Monitor.Pulse(_oLock);
}
}
I started with a Queue<Action>
which allowed me to accomplish asynchronous execution. However, I wanted to be able to support synchronous execution as well to support getting the return value from the delegate. So I looked at what ThreadPool.EnqueueUserWorkItem
used and settled on WaitCallback
.
Now we have our thread daemon setup to queue and execute operations in its thread context. What we need next is a synchronization context to allow the worker threads to marshal the delegate and data from their thread to the thread daemon thread. We’ll implement both ISynchronizeInvoke
and IAsyncResult
classes to nicely encapsulate this functionality. This will offer a test to see if an invoke is required and support both asynchronous and synchronous execution of the event delegate.
class ThreadAsyncResult : IAsyncResult
{
public bool IsCompleted { get; set; }
public WaitHandle AsyncWaitHandle { get; internal set; }
object _state;
public object AsyncState
{
get
{
if (Exception != null)
{
throw Exception;
}
return _state;
}
internal set
{
_state = value;
}
}
public bool CompletedSynchronously { get { return IsCompleted; } }
internal Exception Exception { get; set; }
}
class ThreadSynchronization : ISynchronizeInvoke
{
public readonly int _nExecutingContextID = 0;
private ThreadManager _manager;
public ThreadSynchronization(ThreadManager manager)
{
_nExecutingContextID = Thread.CurrentThread.ManagedThreadId;
_manager = manager;
Log.Debug("Synchronization context created for thread {0}", _nExecutingContextID);
}
public bool InvokeRequired => Thread.CurrentThread.ManagedThreadId
!= _nExecutingContextID;
public IAsyncResult BeginInvoke(Delegate method, object[] args)
{
var result = new ThreadAsyncResult();
var manualResetEvent = new ManualResetEvent(false);
result.AsyncWaitHandle = manualResetEvent;
_manager.QueueWorkItem(delegate
{
try
{
result.AsyncState = method.DynamicInvoke(args);
}
catch (Exception ex)
{
Log.Err(ex);
result.Exception = ex;
}
finally
{
result.IsCompleted = true;
manualResetEvent.Set();
}
});
return result;
}
public object EndInvoke(IAsyncResult result)
{
if (!result.IsCompleted)
{
result.AsyncWaitHandle.WaitOne();
}
return result.AsyncState;
}
public object Invoke(Delegate method, object[] args)
{
var result = BeginInvoke(method, args);
EndInvoke(result);
return result.AsyncState;
}
}
So notice that ThreadSynchronization
is tied to our thread daemon object which implements QueueWortItem
. You could expose access to QueueWorkItem
in a different way if you wish. So, at long last, we have everything setup so we’re ready to alter the events themselves. These events, located within the thread daemon class, would have executed in another worker thread’s execution context. By instantiating the ThreadSynchronization
object, we can test if an invoke is required and enqueue the work to execute on the thread daemon thread and even get the return result of the event.
bool Incoming_Dequeued(object oData)
{
bool bReturn = false;
if (Sync.InvokeRequired)
{
var result = Sync.Invoke(new Func<object, bool>(Incoming_Dequeued),
new object[] { oData });
bReturn = TypeParser.ParseBool(result, false);
return bReturn;
}
return bReturn;
}
void ThreadPool_ThreadComplete(IModuleThread sender, object oResult)
{
if (Sync.InvokeRequired)
{
Sync.Invoke(new Action<IModuleThread, object>(ThreadPool_ThreadComplete),
new object[] { sender, oResult });
return;
}
}
At last, here is the pattern I was looking for all too familiar to anyone who has worked on WinForms or WPF. Now we can easily see if we’re the correct thread and if not, do an asynchronous or synchronous invoke of the delegate and data. When invoking, it’s easy to use an Action<TIn1, TIn2>
or Func<TIn1, Tin2, TOut>
, as required, to generate your delegate.
In conclusion, you can see why Microsoft didn’t have a prepackaged solution to this as they couldn’t presume the executing thread would implement a message pump in a strict fashion. They did provide ISynchronizeInvoke
which also needs IAsyncResult
. Just creating these objects and implementing their interfaces lays bare what you need to do. While I love how it encapsulates this functionality in a familiar manner, it’s not strictly necessary. Really just the implementation of the message pump in our executing thread along with a composite object containing the delegate, data, and a lock would be enough to marshal the required pieces across and signal the thread who fired the event when execution is complete. However, if like me, you are hunting for a best practice implementation, I’m very happy with how neatly the above solution turned out in the end.