Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Concurrent Programming - Investigating Task Messaging To Achieve Synchronization Free Inter-Task Communication

4.80/5 (15 votes)
7 Jan 2008CPOL17 min read 1   208  
Further studies of Parallel FX.

Mandelbrot2

Part I - Concurrent Programming - A Primer

Introduction

My purpose to writing the previous article, Concurrent Programming - A Primer, was to investigate Microsoft's Parallel FX library (PFX). As was made clear by various experts, PFX does not address the issue of application state and shared memory synchronization. One of the features of Erlang that enamored me was the concept of message passing between tasks. Specifically, the thread receiving the message gets a local copy of the message being sent by the sender thread. This completely eliminates synchronization issues. The purpose of this article is to investigate a simple TaskMessageManager (TMM) implementation that works with PFX to add serialized messaging between tasks.

About The Code

If you download the code, you will also need to install the Parallel FX library. Also, and very important, if you compile the code in Debug mode, uncheck the "Define DEBUG constant" in the properties--the debug messages that the TMM outputs will kill the performance of the application when testing the TMM mode. Visual Studio 2008 is required.

Basic Architecture

Tasks register themselves with the TMM, specifying the message queue on which they are listening. The task blocks until a message is placed into the message queue. Tasks can share the same message queue so that work can be distributed among tasks. Rather than use the SelfReplicating flag option (see PFX Problems below), the application must set up tasks to run concurrently against a specific message queue. Messages are retrieved from the queue as the task requests them, meaning that it has completed its current work. A couple built-in message (cancel and done) are used, respectively, to tell the task to terminate and to indicate to the task that all its work is complete. In both cases, that task terminates. If the queue is shared by several tasks, they all terminate (a sort of broadcast to all tasks listening to the message queue).

What The TaskMessageManager Does

The TMM sets up message queues in which the main application and its tasks can communicate between each other, initiating work based on the message type and contents. Furthermore, messages are copied, so that the receiver thread does not directly reference the sender thread's message. The TMM is a thin API which the developer can use to reduce the setup work necessary to achieve the same functionality.

What The TaskMessageManager Does Not Do

The TMM is an experiment, one that I plan to continue further. It is not intended to be used in production code. There are clearly performance issues that will probably never be solvable in C#. There are also core architectural issues that I imagine can only be solved with a VM, like Erlang's, that facilitates moving of data between tasks as values and doesn't even include constructs for synchronization. On the other hand, the careful use of the TMM may result in an application benefiting from synchronization-less task communication.

And of course, the TMM doesn't eliminate synchronization issues between tasks, it merely moves the problem from one the developer has to deal with to one the TMM deals with for the developer. There are lots of lock statements in the TMM to control access to the message queues, and of course, there's the serialization process as well, which is ripe with performance and usability issues. This is true as well with PFX--it makes it easier for the developer to add concurrency into an application by taking the thread management problem off of the developer's hands and putting into PFX.

Implementation

Interfaces

There are two key interfaces, IMessage and IClonableMessage. All messages sent to the tasks must implement IMessage. If the message implements IClonableMessage:

C#
public interface IClonableMessage
{
  IMessage DeepClone();
}

then the DeepClone method is called rather than using the brute force .NET serialization. If you don't implement IClonableMessage, then your message class must be serializable. The IMessage interface is simply a stub at the moment.

Usage

A Simple Example

In this simple example, two tasks are created, and the application waits for the tasks to register (see PFX Problems below):

C#
task1 = Task.Create(ATask, null, TaskManager.Default, TaskCreationOptions.None, "Task1");
task2 = Task.Create(ATask, null, TaskManager.Default, TaskCreationOptions.None, "Task2");
while (!tmm.IsRegistered(task1)) {Thread.Sleep(10);}
while (!tmm.IsRegistered(task2)) { Thread.Sleep(10); }

The task registers itself, and starts listening to the same message queue, specifying a 100ms timeout.

C#
static void ATask(object obj)
{
  tmm.RegisterTask(Task.Current, "TaskQueue");
  bool stopped = false;

  while (!stopped)
  {
    TaskMessage tm = tmm.GetMessage(100);
...

If a task specifies a timeout, the GetMessage call will return the built-in NoMessage. If the task specifies no timeout (-1), the TMM internally still monitors the task to see if a cancel request has been issued by the application or the PFX TaskManager, in which case, the TMM throws the TaskCanceledException1. Because the task might be informed by the application that it is no longer needed, via the StopMessage, a typical task should also check for this message. In our simple example, once the task receives one of our two messages, it terminates itself:

C#
while (!stopped)
{
  TaskMessage tm = tmm.GetMessage(100);
  Debug.WriteLine("!tmm: " + Task.Current.Name + ": "+tm.Message.ToString());

  if (tm.Message is SayHelloMessage)
  {
    // executes in either task 1 or task 2.
    Console.WriteLine("Hello!");
    stopped = true;
  }
  else if (tm.Message is RequestHelloMessage)
  {
    // executes in task 1.
    // Post the message queue rather than a specific task.
    tmm.PostMessage("TaskQueue", new SayHelloMessage());
    stopped = true;
  }
  else if (tm.Message is StopMessage)
  {
    stopped = true;
  }
  else if (tm.Message is NoMessage)
  {
  }
}

The application posts RequestHelloMessage to this queue. One of the tasks will be released, and it posts a SayHelloMessage to the same queue, which usually is processed by the second task before the first one gets around to checking for more messages. In our simple test, the application does this:

C#
tmm.PostMessage("TaskQueue", new RequestHelloMessage());
tmm.Wait();

The message queue monitors the Task.Completed event, and removes tasks from its internal maps, so we can wait until all tasks have completed.

By looking at the trace of this application when it is run, you will see the two tasks working:

Image 2

A different run shows that Task1 received the message first:

Image 3

Concurrent Application Patterns

At this point, it's useful to start identifying concurrent programming patterns.

Process Work Then Terminate

One kind of concurrent application pattern is a "Process Work Then Terminate" pattern, which is basically just a task that waits for a message, acts on that message, then terminates itself. The above example illustrates this pattern.

Process Messages Then Terminate

A slightly more complex version processes all the messages in its queue, then terminates. This assumes that the queue has been fully loaded before the task begins, otherwise the task is racing against the loader. This pattern is currently not possible (see PFX Problems below).

Process Work Until Stopped

A more advanced pattern than the previous requires that the last message added to the queue is StopMessage. The task only terminates when it sees this message. Obviously, this assumes that messages are dequeued in sequentially by all the tasks listening to that queue. Furthermore, it requires that the TMM automatically place messages into the queue for each of the tasks listening to the queue, so that all listening tasks terminate. This pattern, as implemented by the TMM, currently fails when tasks dynamically add themselves to a message queue after the StopMessage has been posted.

A More Complex Example

Going back to the Mandelbrot example5 I used in the first article, I'm going to show how the code is changed yet again, illustrating tasks and task messaging this time. What you will immediately notice about this architecture is that the original code has been broken up into small, autonomous units. In this version though, there is still a global set of variables that is referenced by the tasks:

C#
readonly double xstep;
readonly double ystep;
readonly double escapeRadius;
readonly double logEscapeRadius;
readonly int width;
readonly int height;
readonly byte[] argb;
readonly Color[] colors;
readonly ComplexNumber p1;
readonly ComplexNumber p2;
readonly int maxIteration;
readonly ProgressBar progress;

On my todo list is moving these variables into their respective tasks and using the task parameter to initialize them when the task is created. For the moment, they are designated readonly to emphasize that they are immutable.

The Messages

There are three messages.

C#
/// <summary>
/// The initial task is primed with width messages, each an x coordinate.
/// </summary>
public class XCoordMessage : IMessage, IClonableMessage
{
  public int x;

  public XCoordMessage(int x)
  {
    this.x = x;
  }

  public IMessage DeepClone()
  {
    return new XCoordMessage(x);
  }
}

/// <summary>
/// The coordinate message is sent to the task that computes
/// the iterations.
/// </summary>
public class CoordMessage : IMessage, IClonableMessage
{
  public int x;
  public int y;

  public CoordMessage(int x, int y)
  {
    this.x = x;
    this.y = y;
  }

  public IMessage DeepClone()
  {
    return new CoordMessage(x, y);
  }
}

/// <summary>
/// This message is sent to the task responsible for updating
/// the bitmap.
/// </summary>
public class CoordColorMessage : IMessage, IClonableMessage
{
  public int x;
  public int y;
  public int colorIndex;

  public CoordColorMessage(int x, int y, int colorIndex)
  {
    this.x = x;
    this.y = y;
    this.colorIndex = colorIndex;
  }

  public IMessage DeepClone()
  {
    return new CoordColorMessage(x, y, colorIndex);
  }
}

The first message sets up the task to generate the full (x,y) coordinate that is then passed to the iteration task. The iteration task, when the iteration is determined, sends a message to the task that is responsible for updating the bitmap.

The Tasks

There are three tasks. Each task checks for a StopMessage and terminates itself if that message is returned by the TMM. If not, it checks that the message is the expected message type, and performs the task based on that message.

PostCoordinates

This task posts the (x, y) coordinates of the pixel to compute to the ComputeIterations task.

C#
public void PostCoordinates(object obj)
{
  // When we get the the x coord
  // and vertical height, post the complete coordinate.
  tmm.RegisterTask(Task.Current, "CoordQueue");
  bool stopped = false;

  while (!stopped)
  {
    TaskMessage tm = tmm.GetMessage();
  
    if (tm.Message is StopMessage)
    {
      stopped = true;
    }
    else if (tm.Message is XCoordMessage)
    {
      XCoordMessage xch = (XCoordMessage)tm.Message;
  
      for (int y = 0; y < height; y++)
      {
        tmm.PostMessage("IterationQueue", new CoordMessage(xch.x, y));
      }
    }
  }
}

ComputeIterations

This task computes the number of iterations before z escapes. It operates on the (x, y) coordinate received in the message for its queue.

C#
public void ComputeIterations(object obj)
{
  // When we get the coordinate,
  // compute the # of iterations before escape and 
  // post the result.
  tmm.RegisterTask(Task.Current, "IterationQueue");
  bool stopped = false;

  while (!stopped)
  {
    TaskMessage tm = tmm.GetMessage();

    if (tm.Message is StopMessage)
    {
      stopped = true;
    }
    else if (tm.Message is CoordMessage)
    {
      CoordMessage cm = (CoordMessage)tm.Message;
      ComplexNumber z = p2;
      z.Re = p2.Re + (cm.x * xstep);
      z.Im = p1.Im-cm.y*ystep;
      ComplexNumber C = z;
      int iteration = 0;

      while ( (z.Modulus < escapeRadius) && (iteration < maxIteration) )
      {
        z = z * z + C;
        iteration++;
      }

      int colorIndex = 0;

      if (iteration < maxIteration)
      {
        z = z * z + C; iteration++;
        z = z * z + C; iteration++;
        double mu = iteration - (Math.Log(Math.Log(z.Modulus))) / logEscapeRadius;
        colorIndex = (int)(mu / maxIteration * 768);
      }

      tmm.PostMessage("BitmapQueue", new CoordColorMessage(cm.x, cm.y, colorIndex));
    }
  }
}

UpdateBitmap

This task receives the (x, y) coordinate of the pixel as well as the color index computed by the task above. It is responsible for posting the color to the bitmap.

C#
public void UpdateBitmap(object obj)
{
  // When we get the computed iterations
  // for the coordinate, put it in the bitmap.
  tmm.RegisterTask(Task.Current, "BitmapQueue");
  bool stopped = false;

  while (!stopped)
  {
    TaskMessage tm = tmm.GetMessage();

    if (tm.Message is StopMessage)
    {
      stopped = true;
    }
    else if (tm.Message is CoordColorMessage)
    {
      CoordColorMessage cm = (CoordColorMessage)tm.Message;
      int colorIndex=cm.colorIndex;

      if ((colorIndex < 0) || (colorIndex >= 768))
      {
        colorIndex = 0;
      }

      int index = (cm.y * width + cm.x) * 4;
      argb[index] = colors[colorIndex].B;
      argb[index + 1] = colors[colorIndex].G;
      argb[index + 2] = colors[colorIndex].R;
      argb[index + 3] = 255;

      // See comments in UpdateProgress for why we do this rather than
      // another task that updates the progress bar.
      Interlocked.Increment(ref Tasks.progressValue);
      // tmm.PostMessage("UpdateQueue", UpdateProgressMessage.msg);
    }
  }
}

The Progress Bar Update Process

The progress bar is updated in the main application thread. Because more than one UpdateBitmap task may be running, the task progressValue must be incremented atomically, hence the Interlocked.Increment statement. Attempts to post the count and use BeginInvoke on the progress bar's container form was a disaster--updating the UI with task instances is Not A Good Idea, it would seem. In the main application thread, there is a callback that the TMM is provided when it waits for task completion. This effectively marshals the progress bar update onto the main application thread where it can be safely updated:

C#
protected void UpdateUI()
{
  // on 32 bit systems, this is atomic.
  int val = Tasks.progressValue; 
  progress.Value = val;
  Application.DoEvents();
  Thread.Sleep(100);
}

The upshot though is that this violates the model of keeping data local to the task. I am leaving this issue for a future enhancement.

Task Initialization

Tasks are initialized via a compiler option, as either one Task instance per task, or n Task instances, where n is the number of processors available. You can play with the performance difference by defining this compiler option in the code. This mode simulates task self-replication. Ideally, we would want the TaskManager to manage self-replication.

C#
public void Initialize()
{
  Task task;

#if DualTask
  for (int i = 0; i < System.Environment.ProcessorCount; i++)
#endif
  {
#if DualTask
#else
    int i = 0;
#endif
    task = Task.Create(PostCoordinates, null, TaskManager.Default, 
           TaskCreationOptions.None, "Coordinates"+i);
    while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
    task = Task.Create(ComputeIterations, null, TaskManager.Default, 
           TaskCreationOptions.None, "Iterations"+i);
    while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
    task = Task.Create(UpdateBitmap, null, TaskManager.Default, 
           TaskCreationOptions.None, "Bitmap" + i);
    while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
  }
}

The Main Application Thread

Rather than the two loops for calculating the iterations of each fractal, the main application primes the first task with x coordinate messages and then waits for all the tasks to complete:

C#
Tasks tasks = new Tasks(xStep, yStep, escapeRadius, logEscapeRadius, p.Width, p.Height, 
                        maxIteration, P1, P2, argb, Colors, progress);
tasks.Initialize();
tasks.SendX();

// Wait for the coord queue to flush.
TaskMessageManager.Default.Wait("CoordQueue", UpdateUI);
TaskMessageManager.Default.PostMessage("CoordQueue", StopMessage.Default);
// Wait next for the iteration queue to flush.
TaskMessageManager.Default.Wait("IterationQueue", UpdateUI);
TaskMessageManager.Default.PostMessage("IterationQueue", StopMessage.Default);
// Then wait for the bitmap queue to flush.
TaskMessageManager.Default.Wait("BitmapQueue", UpdateUI);
TaskMessageManager.Default.PostMessage("BitmapQueue", StopMessage.Default);
// Then wait for the bitmap queue to flush.
//TaskMessageManager.Default.Wait("UpdateQueue", UpdateUI);
//TaskMessageManager.Default.PostMessage("UpdateQueue", StopMessage.Default);
TaskMessageManager.Default.Wait(UpdateUI);

The salient point here is that the application posts a StopMessage to the queue when the queue is empty, and finally calls Wait to wait for the last tasks to complete. This really does work, even if it's incredibly inelegant, because the queue of task 1 cannot be emptied without having at least posted messages into the queue of task 2.

Performance Results

The resulting performance is illustrated quite dramatically by this chart4:

Image 4

You will note, to one's amazement, that parallelizing the outer loop (using Parallel.For on the inner loop instead--not together with the outer loop--is considerably worse, as predicted in my first article), as described in my previous article, results in worse performance than the single threaded application, even though I can clearly see that the dual cores, in single threaded execution, are executing at only 50% utilization, and the cores in the PFX version are executing at 100% utilization. As expected, the TMM version is considerably slower, but here at least, setting up multiple tasks results in a performance improvement. Incidentally, in all cases, removing the code that updates the progress bar results in 10%-20% improvement, which just goes to show that updating the UI is an expensive operation.

Now, the interesting thing is that, depending on what area of the fractal one is rendering, the performance is sometimes better with PFX. Perhaps, by luck, I found an area of the fractal where PFX performs worse. This is definitely something that deserves a lot more investigation.

PFX Problems

The most important result of this experiment was the problems I discovered with PFX. Hopefully, they will be addressed in future releases, but if not, replacing the PFX task manager and Task classes is straightforward enough, though it would result in decoupling PLINQ. However, there are alternatives for that as well. In the meantime though, I'm bucking my modus operandi of always re-inventing the wheel and trying to work with PFX as much as possible.

Automatic Task Startup

The first problem is that tasks, once created, immediately start. This means that I cannot register the Task instance after creating it because the task will immediately start asking for messages from the TMM, and it may not have been registered yet by the task creator. Conversely, if the task registers itself (which is how I implemented it), the creator has to wait until the task is registered before posting messages. The first scenario could be accommodated with some sort of auto-register process of the task. The second scenario can be accommodated (and this is my preference, but is not implemented) by first creating the queue. This would allow the task creator to prime the queue with messages, then create the task.

Ideally though, I would think that the Task.Create method would allow you to create the task in a suspended state, and start (or resume) it at a time of your choosing. For completeness, the application should be able to suspend a task. The PFX TaskManager ought to be smart enough to assign a task to the core that was previously dedicated to the now suspended task.

Self-Replication Extra Events

When self-replicating, the Task.Completed event fires even though the task was never started. For this example, I removed the creation of task2. From this trace:

Image 5

you can see that Task1 is registered twice (the PFX TaskManager sees that there are two cores, so it duplicates this task with the same name, which is fine). However, you can also see that there are four "Task Completed" event calls! I do not think that Task.Completed should be called for threads that were never even started.

Task Instance Re-use?

With self replication enabled, I have seen this exception occur:

C#
if (taskQueueMap.ContainsKey(task))
{
  throw new TaskMessageManagerException("The task " + task.Name + 
       " is already registered.");
}

I cannot reliably replicate this problem, but it indicates that a Task instance is possibly re-used by the PFX TaskManager. As to why the taskQueueMap still contained this task, I am not clear on, and the exception may have been thrown due to a bug in my code. None-the-less, knowing whether Task instances are re-used or not is important, and bears further investigation of the System.Threading assembly. If Task instances are re-used, the task better make darn sure it is initialized to a known state (and not its last state) when the task starts.

PFX Bugs Of Concern

Because a task may be waiting a long time for a message, this "known correctness bug" (I guess, saying simply "bug" isn't in vogue anymore): "Tasks blocked for significant periods of time may cause runaway thread injection and out of memory conditions when the default policy is used."2 is of concern. Hopefully, this will be fixed.

Core vs. Thread Utilization

Tasks are assigned to cores, and in self-replication, tasks are replicated when a core becomes available.3 To me, this is not ideal. It assumes that a task will consume 100% of its core, which is not always the case. What if the task is waiting for an I/O completion event? With regards to the TMM, what if a task is waiting for a message? Can no other Task instance, with actual work to do, utilize this core in the meantime? So far, I haven't found any information regarding whether PFX handles cases where tasks do not consume 100% of the core.

Performance

I fully expected that the Mandelbrot rendering with PFX would be better than the single threaded operation, and I'm actually dismayed that it's worse! It would be great if someone from the PFX team could explain this result.

TMM Problems

Task Message Manager Instances

Only one TMM is permitted. The task always interacts with the default TMM. The whole concept of task queues might be better managed by allowing for multiple TMM's, each managing a single queue.

PostMessage Overloads

PostMessage has overloads for posting to the queue or to a task. However, posting to a task is a misnomer, because it posts to that task's queue, which is like posting to a queue, in that any available task listening on that queue gets that message. This is not obvious, and the method signature is misleading, as the developer might incorrectly think that the message is being posted to that specific task.

Self-Replicating Tasks

The TMM does not work with self-replicating tasks. A whole section of the code has been commented out regarding this issue and "smart" removal of tasks no longer needed because there's nothing in the queue to process. This may be mixing apples and oranges regarding task management, and it probably conflicts with PFX's TaskManager as well. Another reason self-replicating tasks don't work is the PFX problem with Task.Completed events greater than the number of Task instances started. And finally is the TMM problem of stopping tasks, described next.

Stopping Tasks

In the "Process Work Until Stopped" pattern, the TMM sends a StopMessage to each task listening on the designated queue. For this to successfully stop all tasks, the task itself must behave correctly--it cannot request another message after the StopMessage is received. Also, since this message is enqueued when it is issued (rather than when it is encountered, which doesn't really solve anything since a task can be added while the other tasks stop), any tasks that add themselves after the message in enqueued will not receive the stop message. This is a serious problem which must be resolved in order for self-replication to work.

Boxing

Using structs for messages is essentially pointless, I would think, because they are boxed when referenced via the interface (though I need to really check on this). This is another reason that the .NET environment is unsuitable in certain scenarios for inter-task messaging. Those scenarios can be identified as having tasks that are so short-lived that serialization of the message is a measurable percentage of the task itself.

Multiple Queues Per Task Message Manager

As eluded to above, multiple queues per TMM causes unnecessary locking when adding and removing messages to a particular queue, as the entire queue collection is being locked, rather than just the queue itself. This will block other tasks that are trying to retrieve a message from a different queue altogether. A future performance improvement is to utilize only one queue per TMM, which will necessitate creating TMM instances.

Other Observations

Debug Messages

Debug messages in the TMM really slow it down.

Too Many Active Threads Per Core

If a core is task switching between numerous active threads, the task switching itself starts to bog down the core. This requires further investigation.

Understand Your Application's Tasks

Working with the TMM, it's clear that a careful understanding of the parallel tasks is required in order to take advantage of concurrent programming.

Understand Your Concurrent Programming Toolset

It also is clear that, in addition to understanding your application requirements, one needs to clearly understand the workings of any library that is facilitating the management of tasks and coordination (such as messaging) between tasks. I would hope that the PFX team produces high quality documentation so that the developer can clearly understand how to best take advantage of PFX and concurrent programming. The pitfall would be to believe that PFX makes concurrent programming easy. It does not--the CTP is little more than syntactical sugar regarding the process of managing threads yourself. The performance test above shows that parallelizing the outer loop does not improve performance--in fact, it degrades it. If anyone on the PFX team can explain this behavior and make post on this article about it, I would greatly appreciate it.

Future Improvements

  • Allow for multiple TaskMessageManager instances.
  • One queue per TMM.
  • Task "heap" initialization by passing initialization parameters to the task, so that there truly is no global referencing of variables.
  • Coexistence with PFX's self-replication algorithms.
  • Task balancing (tasks that don't utilize 100% of the core).
  • Create a insert or update function in the TMM that replaces an existing message rather than adding another message. This would be a one-slot "queue", as it were.

Conclusion

What I have attempted to do here is put together a use case for PFX that probably is outside of the scope of what the designer of PFX had planned for. Also, I wanted to experiment with synchronization free inter-task communication to study the advantages and disadvantages of such an approach. It may turn out that PFX is not suitable for this kind of work, regardless of other more fundamental issues such as serialization performance (which may make .NET languages, in general, unsuitable for this approach). However, I do hope that the creators of PFX will look at this work and consider some of my suggestions. I plan on continuing this investigation in future articles. Performance measurement and optimization is one such topic that intrigues me--how to measure performance changes with PFX, or TMM, and so forth, as well as performance optimization within a core, since quite frankly, I expect to be working in .NET languages for quite a while.

References and Notes

1 - Canceling a task

2 - Known Correctness Bugs with PFX

3 - First Look at Parallel FX and self-replicating tasks

4 - These tests were done on a Sony Viao VGN-FE890 laptop, running 32 bit Vista Business with 2GB RAM, T5500 processor at 1.66Ghz. The Mandelbrot configuration was:

C#
maxIteration = 300;
P1 = new ComplexNumber(-0.669912107426552, 0.451398357629374);
P2 = new ComplexNumber(-0.672973630864052, 0.449948162316874);

and the drawing area was 1254w x 594h.

5 - livibetter's Mandelbrot set with smooth drawing

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)