Introduction
This article shows the usage of the Concurrency and Coordination Runtime (CCR) to implement the Pipes and Filters Design Pattern. The idea came from the interview Bob Familiar had with Stephen Tarmey.
Disclaimer: Please note that I'm not a concurrency expert, and this is my first attempt with the CCR.
Background
The Pipe and Filters Design Pattern is used to describe a pipeline where messages are created by source objects, transformed by filter objects, and consumed by sink objects. Classic multi-threaded programming is imperative programming, where the developer assigns threads (thread per object, thread per message, or an asynchronous thread-pool scheme). For example, an imperative implementation would use Queue<>
for pipes, lock access by a ReaderWriter lock, and perform the object processing as a standard asynchronous operation on the thread pool.
On the other hand, declarative programming defines "what" to do, but not how to do it. For example, LINQ allows to filter an array using a condition without worrying about the specific order the array is traversed. Similarly, the Concurrency and Coordination Runtime (CCR) allows the developer to declare "what" should happen in each object, without worrying about threads, locks, and that kind of stuff (well, almost).
Let's dive into some basic CCR declerations:
using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
{
DispatcherQueue dq = new DispatcherQueue("Pipeline DispatcherQueue", dispatcher);
}
The dispatcher is the object that holds a thread pool. The first parameter is the number of threads to be used, where 0 means set the number of threads to Max(2,NumberOfProcessors) * ThreadPerCPU
. The dispatcher queue holds the list of pending delegates that can be executed immediately and are waiting for a thread to become available. For more information, see the article Concurrency and Coordination Runtime.
Here is a snippet that registers three methods listening on three ports. A port holds messages and holds methods interested in processing these messages.
public void Initialize(
DispatcherQueue dispatcherQueue,
PortSet<correctpinmessage> inputPort,
PortSet<openalarm> outputPort)
{
...
Arbiter.Activate(
dispatcherQueue,
Arbiter.Interleave(
new TeardownReceiverGroup(),
new ExclusiveReceiverGroup(
Arbiter.Receive(true, _timeoutPort, HandleTimeout)),
new ConcurrentReceiverGroup(
Arbiter.Receive(true, InputPort.P0, HandleSuccessMessage),
Arbiter.Receive(true, InputPort.P1, HandleFailureMessage))));
}
Read the code like this:
- Each time
inputPort.Port0<correctpinmessage>
receives a CorrectPinMessage
object, call HandleSuccessMessage
. - Each time
inputPort.Port1<wrongpinmessage>
receives a WrongPinMessage
object, call HandleFailureMessage
. - Each time
_timeoutPort<datetime>
receives a DateTime
object, call the HandleTimeout
method.
The Arbiter.Receive
first parameter (persist = true
) indicates that the method continues receiving messages after handling the first message.
The Arbiter.Interleave
declaration defines the locking mechanism that the CCR uses. HandleSuccessMessage
and HandleFailureMessage
are defined in the concurrent group (in essence, similar to ReaderWriterLock.AcquireReaderLock()
).
The concurrent handlers implementation has to be thread safe (achieved by using interlocked increment):
void HandleFailureMessage(WrongPinMessage wrongPinMessage)
{
if (Interlocked.Increment(ref _wrongPinCount) == 1)
{
_firstWrongPinMessage = wrongPinMessage;
_dispatcherQueue.EnqueueTimer(_pollingPeriod, _timeoutPort);
}
}
void HandleSuccessMessage(CorrectPinMessage correctPinMessage)
{
Interlocked.Increment(ref _correctPinCount);
}
Notice that when the first WrongPinMessage
is received, the timer is started. A few milliseconds later, the _timeoutPort
receives a DateTime
object which triggers the HandleTimeout
method call.
Since the HandleTimeout
is defined in the exclusive group (in essence, similar to ReaderWriterLock.AcquireWriterLock()
), the handler implementation is not required to be thread safe. It accesses the _wrongPinCount
member without using any locks or interlocked calls:
void HandleTimeout(DateTime time)
{
CardStatus newStatus = _currentCardStatus;
if (_correctPinCount > 0)
{
newStatus = CardStatus.Ok;
}
else if (_wrongPinCount > 3)
{
newStatus = CardStatus.Stolen;
}
else if (_wrongPinCount > 0)
{
newStatus = CardStatus.Warning;
}
...
}
The HandleTimeout
method "assumes" that X milliseconds have passed since the first WrongPinMessage
was called. It uses the total number of messages received in that period of time to change the internal state of the state machine.
For more information on CCR basic concepts, see the video CCR Programming - Jeffrey Richter and George Chrysanthakopoulos, and the OOPSLA/SCOOL paper on the CCR. Ccr.Core.dll is available for download as part of the Microsoft Robotics Studio.
Using the code
The code in this article defines a pipeline abstraction of a source (that has an output port - in green) of a filter (that has both an input port and output port - in blue) and a sink (that has only an input port - in yellow).
The demo project describes a worker that has a card with a secret PIN (personal identification number). CorrectPinMessage
describes the event of a worker that typed the correct PIN, and similarly, WrongPinMessage
represents an incorrect PIN number. The "worker mock source" object creates new messages, and the state machine consolidates similar events by counting them. Each state change results in an OpenAlert
message or a CloseAlert
message which are displayed on the console window by the alarm sink.
Each pipeline object implements either IMessageSink
or IMessageFilter
or IMessageSink
. The interface Initialize
method is used to call the CCR Arbiter.Activate()
method. The Start
method is used after all objects have been initialized to start generating messages.
interface IMessageSink<tinputport> where TInputPort : IPort
{
TInputPort InputPortSet { get; }
void Initialize(DispatcherQueue dispatcherQueue, TInputPort inputPortSet);
}
interface IMessageSource<toutputport> where TOutputPort: IPort
{
TOutputPort OutputPortSet { get; }
void Initialize(DispatcherQueue dispatcherQueue, TOutputPort outputPortSet);
void Start();
}
interface IMessageFilter<tinputport,toutputport>
{
TInputPort InputPort { get; }
TOutputPort OutputPort { get; }
void Initialize(DispatcherQueue dispatcherQueue,
TInputPort inputPort, TOutputPort outputPort);
}
Extension methods are used to provide serial and parallel connection of these objects. A serial connection is the natural source ==> filter ==> sink message flow connection. A parallel connection allows N sources to post messages to the same output port.
static public IMessageSource<toutputport> ConnectTo<tinputport,>(
this IMessageSource<tinputport> source,
IMessageFilter<tinputport,> filter)
where TInputPort : IPort,new()
where TOutputPort : IPort
{ ... }
public static IMessageSource<tinputport> ConnectInParallel<tinputport>(
this IMessageSource<tinputport> source1,
IMessageSource<tinputport> source2)
where TInputPort : IPort
{ ... }
When a source is connected to a sink, a complete pipeline is created.
interface IMessagePipeLine
{
void Start(DispatcherQueue dispatcherQueue);
}
public static IMessagePipeLine ConnectTo<tport>(
this IMessageSource<tport> source,
IMessageSink<tport> sink)
where TPort : IPort,new()
{ ... }
State Machine per Worker
In order to save a state machine per worker, we need to de-multiplex the messages into new state machines.
Disclaimer: The following implementation is not optimal. I'd be happy to hear your comments about it.
The first filter converts messages of type T
into KeyValuePair<key,t>
messages. The key is the name of the worker that typed the PIN.
class WorkerKeyValueFilter :
IMessageFilter<PortSet<CorrectPinMessage>,
Port<KeyValuePair<string,CorrectPinMessage>>>
{
void HandleMessage<T>(T message) where T : WorkCardSwipeMessageBase
{
OutputPort.Post(new KeyValuePair<string,T>(message.Name, message));
}
}
The second filter uses the key to post the message to the dedicated port.
class DemuxMessageFilter<TMessageFilter, TInputPort,TOutputPort,TKey>
: IMessageFilter<TInputPort,TOutputPort>
where TMessageFilter : IMessageFilter<TInputPort,TOutputPort>, new()
where TInputPort : IPortSet,new()
where TOutputPort : IPort
{
void HandleMessage(KeyValuePair<TKey,object> message)
{
var messageFilter = GetMessageFilter(message.Key);
messageFilter.InputPort.PostUnknownType(message.Value);
}
TMessageFilter GetMessageFilter(TKey key)
{
TMessageFilter filter;
if (!_messageFilters.TryGetValue(key, out filter))
{
filter = new TMessageFilter();
filter.Initialize(_dispatcherQueue, new TInputPort(), OutputPort);
_messageFilters.Add(key, filter);
}
return filter;
}
}
History
- 10-Oct-2008: Initial version.