I want to provide this advice on thread communication, because I already gave several answers on the use of threads to CodeProject members. I post my Answers to Questions found in Question & Answers section of the site; and usually my answers are accepted very well. The only problem is that many questions cause me to reproduce nearly the same ideas over and over. I need to place some code and explanation in a permanent place and refer to it. At the same time, the techniques I offer may not fit the format of whole article. So, I think the form of Tips/Tricks will serve my purpose well.
Here is the most typical situation. A developer creates some UI, most typically with System.Windows.Forms
; WFP is less usual at this time. Some UI action requires long time processing; this is where the handling of the UI event does not work well. In this case, some developers don't know what to do, and some use threads, but try to create a thread repeatedly, using Background worker, created a regular threads or a thread from thread pool. In simple cases this is enough, but in massive processing and more complex logics the problem is the total number of threads that becomes unpredictable. Of course, the extra cost of repeated creation and later abandoning of a thread can easily cause a performance hit, and more importantly, a problem of clean-up (and generally, of support).
In many such cases, I advice to create a fixed number of threads and permanently keep them running, posting some task to such thread when needed. What is relatively difficult to explain is how to keep a thread in a wait state (wasting zero CPU time) between tasks. It also takes some nerve to explain why spin wait is wrong.
Another problem which seems to be different is a desire to implement behavior similar to System.Windows.Forms.Control.Invoke
or System.Windows.Threading.Dispatcher
. There is a popular misconception about this class, caused by the method System.Windows.Threading.Dispatcher.GetCurrectDispatcher
. It always works, but the Invoke
method can only be called on the target thread if this thread is designed in a special way. In practice, such inter-thread processing really takes place if the target thread if the specially designed UI thread created in the UI cycle of Window.Form.Application
or WPF Application. The problem is that the class Dispatcher
is sealed. So, the question is how to provide similar functionality a custom thread.
The two groups of problems depicted above can be resolved using some queue encapsulated in the thread used as a target of the invocation. First, I made a simple blocking queue class to provide both data transport and synchronization between threads:
using System;
using System.Threading;
using System.Collections.Generic;
public class DataQueue<ITEM> {
public DataQueue() { this.IsBlocking = true; }
public DataQueue(bool blocking) { this.IsBlocking = blocking; }
public EventWaitHandle WaitHandle { get { return FWaitHandle; } }
public void Close() {
this.FWaitHandle.Set();
}
public void Submit(ITEM item) {
lock (LockObject)
Queue.Enqueue(item);
FWaitHandle.Set();
}
public ITEM GetMessage() {
if (IsBlocking)
FWaitHandle.WaitOne();
ITEM result = default(ITEM);
lock (LockObject) {
if (Queue.Count > 0)
result = Queue.Dequeue();
if (IsBlocking && (Queue.Count < 1))
FWaitHandle.Reset();
return result;
}
}
public void Clear() {
lock (LockObject)
Clear(Queue);
}
public void Clear(Func<ITEM, bool> removalCriterion) {
lock (LockObject)
Clear(Queue, removalCriterion);
}
#region implementation
static void Clear(Queue<ITEM> queue) { queue.Clear(); }
static void Clear(Queue<ITEM> queue, Func<ITEM, bool> removalCriterion) {
if (removalCriterion == null) {
queue.Clear();
return;
}
Queue<ITEM> copy = new Queue<ITEM>();
while (queue.Count > 0) {
ITEM item = queue.Dequeue();
if (!removalCriterion(item))
copy.Enqueue(item);
}
while (copy.Count > 0)
queue.Enqueue(copy.Dequeue());
}
bool IsBlocking;
Queue<ITEM> Queue = new Queue<ITEM>();
EventWaitHandle FWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
Object LockObject = new object();
#endregion implementation
}
I recommend using blocking operation nearly in all cases, otherwise the mechanism would behave like spin wait, causing unwanted abuse of the CPU.
Here is the simple sample of the target thread and the usage, a Console application:
using System;
using System.Threading;
class QueuedThread {
internal QueuedThread() { this.Thread = new Thread(Body); }
internal void Start() { this.Thread.Start(); }
internal void Abort() {
this.Queue.Close();
this.Thread.Abort();
}
internal void PostMessage(string message) {
this.Queue.Submit(message);
}
#region implementaion
void Body() {
try {
while (true) {
string message = Queue.GetMessage();
if (message == null)
break;
ProcessMessage(message);
}
} catch (ThreadAbortException) {
} catch (Exception e) {
Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
}
}
void ProcessMessage(string message) {
Console.WriteLine("message Processed {0}", message);
}
Thread Thread;
DataQueue<string> Queue = new DataQueue<string>();
#endregion implementaion
}
class Program {
void Run() {
QueuedThread t = new QueuedThread();
t.Start();
t.PostMessage("first");
Thread.Sleep(400);
t.PostMessage("second");
Thread.Sleep(400);
t.PostMessage("third");
Console.WriteLine();
Console.WriteLine("Finished. Press any key...");
Console.ReadKey(true);
t.PostMessage(null);
}
static void Main(string[] args) {
new Program().Run();
}
}
I don't want to go deeper into the relatively complex issue related to acceptance of Abort
method (see sample code). Sometimes, such discussions lead to flame wars. The final decision should be made by the developer who wants to use my advice (and, generally, any developer using threads). Anyway, when a data queue is used, Abort could easily be avoided using a special “termination” message (like an empty string in my simple example above).
Finally, one can also use delegates to implement processing tasks instead just of data elements:
The above may appear to be not flexible enough. The thread needs to know what to do with the data item; hence ProcessMessage
is just one fixed method. Here is when inter-thread invocation similar to Dispatcher
should come into play.
So, how about the Delegate invocation? Well, nothing prevents adding a delegate to the ITEM
type. This is a just bit more complex. The first thing to understand is that a parameter (or parameters) for delegate invocation should be supplied as well. This is a minimalistic sample, again a Console application:
using System;
using System.Threading;
class DelegateInvocationThread {
class ActionWithParameter<PARAMETER> {
internal ActionWithParameter(Action<PARAMETER> action, PARAMETER parameterValue) {
this.Action = action;
this.Parameter = parameterValue;
}
internal static ActionWithParameter<PARAMETER> ExitAction { get { return FExitAction; } }
internal void Invoke() {
if (Action != null)
Action(Parameter);
}
internal bool ExitCondition { get { return Action == null || Parameter == null; } }
Action<PARAMETER> Action;
PARAMETER Parameter;
static ActionWithParameter<PARAMETER> FExitAction =
new ActionWithParameter<PARAMETER>(null, default(PARAMETER));
}
internal DelegateInvocationThread() { this.Thread = new Thread(Body); }
internal void Start() { this.Thread.Start(); }
internal void Invoke(Action<string> action, string parameter) {
Queue.Submit(new ActionWithParameter<string>(action, parameter));
}
internal void InvokeExit() {
Queue.Submit(ActionWithParameter<string>.ExitAction);
}
internal void Abort() {
this.Queue.Close();
this.Thread.Abort();
}
#region implementaion
void Body() {
try {
while (true) {
ActionWithParameter<string> action = Queue.GetMessage();
if (action.ExitCondition)
break;
action.Invoke();
}
} catch (ThreadAbortException) {
} catch (Exception e) {
Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
}
}
void ProcessMessage(string message) {
Console.WriteLine("message Processed {0}", message);
}
Thread Thread;
DataQueue<ActionWithParameter<string>> Queue =
new DataQueue<ActionWithParameter<string>>();
#endregion implementaion
}
class Program {
void Run() {
DelegateInvocationThread t = new DelegateInvocationThread();
t.Start();
t.Invoke(
delegate(string value) {
Console.WriteLine(string.Format("Method #1, parameter: {0}", value));
}, "first");
Thread.Sleep(400);
t.Invoke(
delegate(string value) {
Console.WriteLine(string.Format("Method #2, parameter: {0}", value));
}, "second");
Thread.Sleep(400);
t.Invoke(
delegate(string value) {
Console.WriteLine(string.Format("Method #3, parameter: {0}", value));
}, "third");
Console.WriteLine();
Console.WriteLine("Finished. Press any key...");
Console.ReadKey(true);
t.InvokeExit();
}
static void Main(string[] args) {
new Program().Run();
}
}
I've shown anonymous delegate syntax which can be compiled with C# v.2 or later. With C# v.3 lambda form of anonymous delegate can also be used:
t.Invoke(
new Action<string>((value) => {
Console.WriteLine(string.Format("Method #3, parameter: {0}", value));
}), "third");
This idea could further be used to combine more than one queue in the same target thread. The warning is: it is not possible to use more than one instance of DataQueue
in the same thread: they will block each other, if blocking operation is used. The modifications should go inside the class DataQueue
. At the same time, such generalization is easy using the techniques I offer.
Thank you,
- — Sergey A Kryukov