Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Data-controlled Processes Application Design

4.65/5 (7 votes)
23 Nov 2009CPOL10 min read 30.8K   380  
A ready-to-use process manager provided

Introduction

Let's consider an activity diagram of borrowing a book from a library use case.

Image 1

What is wrong here?

  • Many direct calls between layers. This causes a high implementation dependence and can produce a wide range of errors.
  • Every experienced developer knows that a number of bugs is directly proportional to a number of conditional statements. Here we have a lot of them. Moreover, you cannot precisely predict how the system will perform because calls between layers depend on user's decisions.
  • When many users use the system concurrently then an uncontrolled thread explosion may happen. Since every session calls internal methods in its own thread, the system might be overloaded by too many thread instances.
  • Resources are not fully utilized. Some of the processing is done on demand and eventually a user has to wait for them.
  • Parallelism is hard to implement. As shown in the diagram, the programmer has to explicitly specify threading scenarios.

What do we want to see?

  • No direct calls between layers
  • Utilizing all resources with preprocessing
  • Controllable pool of threads
  • Automatic parallelization of jobs
  • No conditional statements
  • Layers which are transparent to each other

What about a data-flow diagram?

Image 2
  • There are no direct calls to internal ("public") methods.
  • A data-oriented design. We send data to a system and it asynchronously yields results.
  • The system is not static. It is a concrete, single instance which receives data coming from different sources. With such an approach, the number of threads can be easily managed.
  • Procedure-dependence was changed to type-dependence.
  • The system relies on a Data Process Manager (DPM), which is supposed to distribute incoming data among processes.

Using the Code

To apply a data process design pattern, you are supposed to split your program into independent processes which can perform atomic operations on data. Programmatically speaking, you have to create a class implementing the IDataProcess interface per process.

To give a practical example of how to use the data process pattern, we will check the Pythagorean trigonometric identity:

Image 3

Well maybe not exactly practical but it is a good material for explaining things, though. As said above, the equation must be divided into independent processes. Here they are:

  1. Sin2 evaluator
  2. Cos2 evaluator
  3. Adder

After implementation, we will have to somehow arrange them to execute in the correct order. Since everything is controlled by data and all processes may be running concurrently, how do we ensure that an adder adds sin2x + cos2x, and not, for example, cos2x + x? How do we ensure that the adder process receives exactly two numbers?

Implementation of Processes

A definition of a process consist of two parts:

  1. Input data specification. How many inputs are there and what requirements do they have to meet?
  2. Process body. A method which performs an operation on input data and produces output.

These two members are defined in the IDataProcess interface, which we are going to implement right now.

C#
class Sin2Process : IDataProcess
{
    public object[] Do(InputSet input)
    {
        var number = input.SingleData<Number>();
        number.X = Math.Sin(number.X);
        number.X *= number.X;
        return new[] { number };
    }

    public ProcessInputRequirements GetRequirements()
    {
        return new ProcessInputRequirements(o => o.Data is Number);
    }
}

The Do method receives a set of input objects, which may come from different processes as long as they meet both explicitly defined and implied requirements. The first line acquires an object of type Number. Next, we perform mathematical operations, and return an array containing an element - a modified number. But wait: why do we pass an instance of a Number wrapper class instead of a double? Because, this process modifies (performs operations) on input data and does not create new instances. If a new instance was created, then the Data Process Manager (DPM) could not determine the data's "trace", which basically is a list of processes which operated on it. Remember the rule:

Never create a new instance of data if not necessary.

Thus, data which travels from one process to another should be of a reference type (a class). That is, neither a struct nor a primitive type. You can modify contents of passed objects, but do not create new ones unless it is indispensable.

The GetRequirements method is more intriguing. It returns a ProcessInputRequirements instance, which tells DPM when it can call the Do method. There are various types of restrictions you can add. For now, let's stay with the syntax above. We define a lambda expression which predicates if data can be accepted.

We can define a Cos2Process in a similar manner.

C#
class Cos2Process : IDataProcess
{
    public object[] Do(InputSet input)
    {
        var number = input.SingleData<Number>();
        number.X = Math.Cos(number.X);
        number.X *= number.X;
        return new[] { number };
    }

    public ProcessInputRequirements GetRequirements()
    {
        return new ProcessInputRequirements(o => o.Data is Number);
    }
}

An addition process is next in the row:

C#
class AddProcess : IDataProcess
{
    public object[] Do(InputSet input)
    {
        Number[] numbers = input.AllData<Number>();
        double sum = numbers[0].X + numbers[1].X;
        return new[] { new Number { X = sum } };
    }

    public ProcessInputRequirements GetRequirements()
    {
        return new ProcessInputRequirements(a => a.Data is Number,
                                            b => b.Data is Number);
    }
}

As you can see, we can tell DPM that a process needs more than one input by separating multiple lambda expressions with commas. It can also be done this way:

C#
return new ProcessInputRequirements
{
   {
       new PredicateRequirement
       {
           Predicate = a => a.Data is Number,

       }, 2
       }
};

Well, that's it. Now we have to add the defined processes to a DPM and send test data.

Using the Data Process Manager (DPM)

The DPM is an engine which stores all process instances and distributes incoming data among them. It checks all constraints, manages the number of threads, queues incoming data, and tracks objects to give a control over a process flow.

In the first place, we have to instantiate the DPM and add processes using a collection initializer.

C#
var manager = new DataProcessManager
{
  new Sin2Process(),
  new Cos2Process(),
  new AddProcess()
};

This will add all three processes. Now, we have to handle output data.

C#
// Fires if there was produced output which cannot be input for any process.
manager.Output += manager_Output;

static void manager_Output(object sender, DataProcessManager.OutputEventArgs e)
{
    Console.WriteLine("Output: {0}", e.Output);
}

Now, we send test data and activate the manager.

C#
manager.SendData(new Number{X= 123d});

// You need to activate the manager to start process.
// Although you can still send data when the manager is activated,
// it will produce unnecessary locking overhead
// on each SendData call. Activating is more expensive than locking, though.
manager.Activate();

Finally, we would like to deactivate the manager when everything is done. As it should be in a very-simple-to-use library, there is an IsEverythingDone property...

C#
while (!manager.IsEverythingDone)
   Thread.Sleep(100);
manager.Deactivate();
Console.ReadKey();

Now, compile and run. Of course, it didn't work.

What exactly happened:

  1. The data of type Number is sent.
  2. The DPM executes the processes Sin and Cos. It also adds the Number to an individual list of available data for the Add process. All needed data will be cumulated as long as all required inputs are collected.
  3. The Sin and Cos processes start parallel, and return a modified number, which is still the same instance of a Number class.
  4. Since we still have only one instance of data, the Add process will never get two different inputs, and thus will not be executed.
  5. There are no processes which can use available data. The Output event is fired.

And the output is printed:

Output: 2,50373221144008E-05

Which in this case is equal to sin2(cos2x) or cos2(sin2x), depending on the threading scenario.

Thinking rationally, we would expect an infinite loop. Since Sin produces a Number, both Sin and Cos should be re-run. Why did it not happen? Because, the DPM does not allow recursion by default. You can override this setting for an individual process, with:

C#
var manager = new DataProcessManager
{
  new AddProcessParams(new Sin2Process())
  {
      AllowRecursion = true
  },
  new Cos2Process(),
  new AddProcess()
};

Now, no output is printed because there is always a process which can accept available data. And so, sin2(sin2(...sin2(cos2(sin2 x))...)) is calculated. Note that there is no traditional procedural recursion. There will never happen a stack overflow. However, we must always look for unexpected recursion problems.

OK, back to our Pythagorean problem. How do we solve it? First of all, we need two instances of a Number class. So, we add another process, a Number Generator, which accepts a double.

C#
class NumberGeneratorProcess : IDataProcess
{
    public object[] Do(InputSet input)
    {
        var number = input.SingleData<double>();
        return new[] { new Number { X = number } };
    }

    public ProcessInputRequirements GetRequirements()
    {
        return new ProcessInputRequirements(obj => obj.Data is double);
    }
}

Sequences, Groups, and Sets

I have provided an automatic way to control data flow by arranging processes into groups which have various properties. The complete and working solution of the problem above is shown here:

var manager = new DataProcessManager
              {
                  new DataProcessSequence
                  {
                      new DataProcessLooseSet
                      {
                          new DataProcessSequence
                          {
                              new NumberGeneratorProcess(),
                              new Sin2Process()
                          },
                          new DataProcessSequence
                          {
                              new NumberGeneratorProcess(),
                              new Cos2Process()
                          }
                      },
                      new AverageSumProcess(),
                      new AverageProcess()
                  }
              };

I think it's quite optimal. The complex and difficult processes of calculating sin and cos are executed in parallel. At the same time, the Add process waits until it has data from both of them. Output:

Output: 0,999896776901505
Output: 0,999534958029976
Output: 0,999724379719487
Output: 1

Because we added an AverageProcess, we get more and more accurate result at each iteration. In the end, every sin2x has its cos2x, and we get a trigonometric identity. Note: rounding errors may happen.

During object initialization, all this tree-like structure is flattened to a simple list. However, additional constraints are added. In this specific case, it will be:

  • Number Generator - no restriction
  • Sin/Cos2Process - the previous process must be a Number Generator. I mean, a concrete instance of that process which, in the listing above, stands directly over the Sin/Cos2Process.
  • Add Process - the previous process for each input must be either Sin2 Process or Cos Process. Note that the initializer "intelligently" determines that the only possible "endings" of the LooseSet before Add are Sin and Cos, because they are finish points of the sequences.

There are three structures provided:

  • DataProcessSequence - Each process (excluding the first one) can operate on data coming from the previous one. However if the previous "process" of process A is a set of processes, then A can operate on data which comes from any of the possible endings of that set. I say "endings" instead of just "any of them", because sometimes, like in an example above, it is possible to narrow down the list of possible finish points.
  • DataProcessParallel - A process cannot use input coming from other processes in the same set.
  • DataProcessLooseSet - Virtually does nothing, but it still stands as a set, so if put in a sequence, then it's treated as a single process.

You can create your own structures by implementing the DataProcessSet abstract class.

Points of Interest

Performance

The DP design pattern is strongly related to parallelism. The DPM starts each process in a separate thread, so for a process programmer, it is essential to properly distribute work among threads, taking a target machine configuration into account. DPM is very inefficient on single-core machines. In fact, it is slow. There is a lot overhead related to managing data flow, and it cannot be eliminated if we want to have a practical solution and do not have to think about everything. In the following example, we will modify the sin-cos problem to operate on ranges of numbers instead of numbers themselves. Now, the processes will exchange a Range object, and the NumberGenerator will accept a RangeSpecification.

C#
class RangeSpecification
{
    public double LowerBound = 0;
    public double UpperBound = 1000;
    public double Step = 0.00001;
    // Warning: takes about 2 GB of memory

    public int Count
    {
        get { return (int)((UpperBound - LowerBound) / Step); }
    }
}

class Range
{
    public double[] Array;
    public override string ToString()
    {
        return string.Format("{0} numbers", Array.Length);
    }
}

I assume that you can imagine the rest of the code, but thinking performance-wise, I would like to put your attention on a great optimization used. In NumberGeneratorProcess:

C#
class NumberGeneratorProcess : IDataProcess
{
    internal static readonly int RangeCount = 2 * Environment.ProcessorCount;

    public object[] Do(InputSet input)
    {
        var spec = input.SingleData<rangespecification>();

        var ranges = new Range[RangeCount];

        for (int i = 0; i < RangeCount; i++)
        {
            ranges[i] = new Range { Array = new double[spec.Count / RangeCount] };
            double x = spec.LowerBound + spec.Count * i * spec.Step / RangeCount;
            for (int j = 0; j < ranges[i].Array.Length; j++, x += spec.Step)
                ranges[i].Array[j] = x;
        }
        return ranges;
    }

    public ProcessInputRequirements GetRequirements()
    {
        return new ProcessInputRequirements(obj => obj.Data is RangeSpecification);
    }
}

What exactly was done? Instead of sending a single range object, we send a number of them. What is the result? My quad-core processor uses about 20-40% of its power. The DPM (by default) limits the number of threads to 2*ProcessorCount + 1, which provides eight threads to processes and one for DPM exclusively. Since not all processes can run in parallel, 4*ProcessorCount threads are needed in the code above. As a result, the Cos aren't actually running in parallel, because an available thread pool is emptied by Sins because they are first in a list of processes. Now, if we add a line:

C#
manager.ThreadLimit = 4 * Environment.ProcessorCount;

or remove the "*2" from NumberGeneratorProcess:

C#
internal static readonly int RangeCount = Environment.ProcessorCount;

then CPU utilization is 100%. That is, all four cores use their full power. You should always fit the number of threads to a thread limit. You never want one process to wait for another. Although DPM tries to do its best, it is always better to manually handle the thread limit if you know something more about the processes you use.

Image 4

Remarks

  • To increase performance and responsiveness, the Data Process Manager (DPM) limits the number of threads to twice the number of available CPU cores. You can change or turn off thread limiting using the DataProcessManager.ThreadLimit property. The limit does not include the manager's thread.
  • You can also adjust the individual thread limits for each process by setting the ThreadLimit in the AddProcessParams parameter of the Add method.
  • More threads than CPU count => longer waiting, more result at the same time.
  • The number of threads equal to CPU count => optimal performance, results come in packs of size the same as the CPU count.
  • Less threads than CPUs => less or no improvement due to a parallelization.
  • You do not have to stick to a single DataProcessManager instance. If you have groups of irrelevant processes, then spread them among a number of managers. Each manager fires its own Output event. Although OutputEventArgs has a Source property, splitting a big switch block into separate methods is recommended. However, a DPM is a quite heavy object.
  • You cannot add new processes while a DPM is active. Use the Deactivate method before adding a new process. Even when deactivated, you can neither modify nor remove already added processes.
  • Modifying the GetRequirements method will not change anything -- it is called only once, when adding a process to the DPM.
  • Output is not the only event you can use. Since processes are defined as classes, you can define custom events inside them and fire them whenever you want.
    C#
    MyProcessWithEvents myProcessWithEvents = new MyProcessWithEvents();
    myProcessWithEvents.MyEvent += MyProcess_Event;
    var manager = new DataProcessManager
                  {
                      myProcessWithEvents
                  };
  • Please use the message board in the bottom of this page to report bugs, suggest other solutions, or share interesting remarks.

History

  • 2009-11-02 - Original version posted
  • 2009-11-20 - Article update

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)