Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Pipes and Filters concurrent Design Pattern using the Concurrency and Coordination Runtime

3.67/5 (3 votes)
8 Oct 2008Ms-PL4 min read 1   373  
A modular approach for concurrent message passing.

Image 1

Introduction

This article shows the usage of the Concurrency and Coordination Runtime (CCR) to implement the Pipes and Filters Design Pattern. The idea came from the interview Bob Familiar had with Stephen Tarmey.

Disclaimer: Please note that I'm not a concurrency expert, and this is my first attempt with the CCR.

Background

The Pipe and Filters Design Pattern is used to describe a pipeline where messages are created by source objects, transformed by filter objects, and consumed by sink objects. Classic multi-threaded programming is imperative programming, where the developer assigns threads (thread per object, thread per message, or an asynchronous thread-pool scheme). For example, an imperative implementation would use Queue<> for pipes, lock access by a ReaderWriter lock, and perform the object processing as a standard asynchronous operation on the thread pool.

On the other hand, declarative programming defines "what" to do, but not how to do it. For example, LINQ allows to filter an array using a condition without worrying about the specific order the array is traversed. Similarly, the Concurrency and Coordination Runtime (CCR) allows the developer to declare "what" should happen in each object, without worrying about threads, locks, and that kind of stuff (well, almost).

Let's dive into some basic CCR declerations:

C#
using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
{
   DispatcherQueue dq = new DispatcherQueue("Pipeline DispatcherQueue", dispatcher);
}

The dispatcher is the object that holds a thread pool. The first parameter is the number of threads to be used, where 0 means set the number of threads to Max(2,NumberOfProcessors) * ThreadPerCPU. The dispatcher queue holds the list of pending delegates that can be executed immediately and are waiting for a thread to become available. For more information, see the article Concurrency and Coordination Runtime.

Here is a snippet that registers three methods listening on three ports. A port holds messages and holds methods interested in processing these messages.

C#
public void Initialize(
               DispatcherQueue dispatcherQueue,
               PortSet<correctpinmessage> inputPort,
               PortSet<openalarm> outputPort)
{
    ...
    Arbiter.Activate(
        dispatcherQueue,
        Arbiter.Interleave(
            new TeardownReceiverGroup(),
            new ExclusiveReceiverGroup(
                Arbiter.Receive(true, _timeoutPort, HandleTimeout)),
            new ConcurrentReceiverGroup(
                Arbiter.Receive(true, InputPort.P0, HandleSuccessMessage),
                Arbiter.Receive(true, InputPort.P1, HandleFailureMessage))));
}

Read the code like this:

  • Each time inputPort.Port0<correctpinmessage> receives a CorrectPinMessage object, call HandleSuccessMessage.
  • Each time inputPort.Port1<wrongpinmessage> receives a WrongPinMessage object, call HandleFailureMessage.
  • Each time _timeoutPort<datetime> receives a DateTime object, call the HandleTimeout method.

The Arbiter.Receive first parameter (persist = true) indicates that the method continues receiving messages after handling the first message.

The Arbiter.Interleave declaration defines the locking mechanism that the CCR uses. HandleSuccessMessage and HandleFailureMessage are defined in the concurrent group (in essence, similar to ReaderWriterLock.AcquireReaderLock()).

The concurrent handlers implementation has to be thread safe (achieved by using interlocked increment):

C#
void HandleFailureMessage(WrongPinMessage wrongPinMessage)
{
    if (Interlocked.Increment(ref _wrongPinCount) == 1)
    {
       _firstWrongPinMessage = wrongPinMessage;
       _dispatcherQueue.EnqueueTimer(_pollingPeriod, _timeoutPort);
    }
}

void HandleSuccessMessage(CorrectPinMessage correctPinMessage)
{
    Interlocked.Increment(ref _correctPinCount);
}

Notice that when the first WrongPinMessage is received, the timer is started. A few milliseconds later, the _timeoutPort receives a DateTime object which triggers the HandleTimeout method call.

Since the HandleTimeout is defined in the exclusive group (in essence, similar to ReaderWriterLock.AcquireWriterLock()), the handler implementation is not required to be thread safe. It accesses the _wrongPinCount member without using any locks or interlocked calls:

C#
void HandleTimeout(DateTime time)
{
   CardStatus newStatus = _currentCardStatus;

   if (_correctPinCount > 0)
   {
       newStatus = CardStatus.Ok;
   }
   else if (_wrongPinCount > 3)
   {
        newStatus = CardStatus.Stolen;
   }
   else if (_wrongPinCount > 0)
   {
        newStatus = CardStatus.Warning;
   }
   ...
}

The HandleTimeout method "assumes" that X milliseconds have passed since the first WrongPinMessage was called. It uses the total number of messages received in that period of time to change the internal state of the state machine.

For more information on CCR basic concepts, see the video CCR Programming - Jeffrey Richter and George Chrysanthakopoulos, and the OOPSLA/SCOOL paper on the CCR. Ccr.Core.dll is available for download as part of the Microsoft Robotics Studio.

Using the code

The code in this article defines a pipeline abstraction of a source (that has an output port - in green) of a filter (that has both an input port and output port - in blue) and a sink (that has only an input port - in yellow).

Image 2

The demo project describes a worker that has a card with a secret PIN (personal identification number). CorrectPinMessage describes the event of a worker that typed the correct PIN, and similarly, WrongPinMessage represents an incorrect PIN number. The "worker mock source" object creates new messages, and the state machine consolidates similar events by counting them. Each state change results in an OpenAlert message or a CloseAlert message which are displayed on the console window by the alarm sink.

Each pipeline object implements either IMessageSink or IMessageFilter or IMessageSink. The interface Initialize method is used to call the CCR Arbiter.Activate() method. The Start method is used after all objects have been initialized to start generating messages.

C#
interface IMessageSink<tinputport> where TInputPort : IPort
{
   TInputPort InputPortSet { get; }
   void Initialize(DispatcherQueue dispatcherQueue, TInputPort inputPortSet);
}

interface IMessageSource<toutputport> where TOutputPort: IPort
{
    TOutputPort OutputPortSet { get; }
    void Initialize(DispatcherQueue dispatcherQueue, TOutputPort outputPortSet);
    void Start();
}

interface IMessageFilter<tinputport,toutputport>
{
    TInputPort InputPort { get; }
    TOutputPort OutputPort { get; }
    void Initialize(DispatcherQueue dispatcherQueue, 
         TInputPort inputPort, TOutputPort outputPort);
}

Extension methods are used to provide serial and parallel connection of these objects. A serial connection is the natural source ==> filter ==> sink message flow connection. A parallel connection allows N sources to post messages to the same output port.

C#
static public IMessageSource<toutputport> ConnectTo<tinputport,>(
            this IMessageSource<tinputport> source,
            IMessageFilter<tinputport,> filter)
            where TInputPort : IPort,new()
            where TOutputPort : IPort
{ ... }

public static IMessageSource<tinputport> ConnectInParallel<tinputport>(
            this IMessageSource<tinputport> source1, 
            IMessageSource<tinputport> source2)
            where TInputPort : IPort
{ ... }

When a source is connected to a sink, a complete pipeline is created.

C#
interface IMessagePipeLine
{
     void Start(DispatcherQueue dispatcherQueue);
}
    
public static IMessagePipeLine ConnectTo<tport>(
            this IMessageSource<tport> source, 
            IMessageSink<tport> sink)
            where TPort : IPort,new()
{ ... }

State Machine per Worker

In order to save a state machine per worker, we need to de-multiplex the messages into new state machines.

Image 3

Disclaimer: The following implementation is not optimal. I'd be happy to hear your comments about it.

The first filter converts messages of type T into KeyValuePair<key,t> messages. The key is the name of the worker that typed the PIN.

C#
class WorkerKeyValueFilter : 
   IMessageFilter<PortSet<CorrectPinMessage>, 
   Port<KeyValuePair<string,CorrectPinMessage>>>
{
   void HandleMessage<T>(T message) where T : WorkCardSwipeMessageBase
   {
      OutputPort.Post(new KeyValuePair<string,T>(message.Name, message));
   }
}

The second filter uses the key to post the message to the dedicated port.

C#
class DemuxMessageFilter<TMessageFilter, TInputPort,TOutputPort,TKey> 
        : IMessageFilter<TInputPort,TOutputPort>
        where TMessageFilter : IMessageFilter<TInputPort,TOutputPort>, new() 
        where TInputPort : IPortSet,new() 
        where TOutputPort : IPort
{

   void HandleMessage(KeyValuePair<TKey,object> message) 
   { 
      var messageFilter = GetMessageFilter(message.Key); 
      messageFilter.InputPort.PostUnknownType(message.Value); 
   } 

   TMessageFilter GetMessageFilter(TKey key) 
   { 
      TMessageFilter filter; 
      if (!_messageFilters.TryGetValue(key, out filter)) 
      { 
         filter = new TMessageFilter(); 
         filter.Initialize(_dispatcherQueue, new TInputPort(), OutputPort); 
         _messageFilters.Add(key, filter); 
      } 
      return filter; 
   }
}

History

  • 10-Oct-2008: Initial version.

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)