0. Introduction
Microsoft has provided the .NET framework various helpful thread synchronization
primitives starting from monitors up to reader-writer locks. What is missing is
the same on the inter-process level, as well as simple message passing
mechanisms useful e.g. for service (client/server or SOA) and producer/consumer
patterns on both thread and process level. I try to fill this gap with a simple
self-contained framework for both inter-thread and inter-process
synchronization and communication (IPC) using structures like semaphores,
mailboxes, memory-mapped files, blocking channels and simple message flow
controllers. The set of classes provided in this articles is also available as
a library project (open source, BSD License) maintained on
www.cdrnet.net/projects/threadmsg/.
The ambition of this framework is as short as simple:
-
Abstraction: A message processing thread should not need to
matter whether its messages are sent to the next thread or to another machine
through a MSMQ message queue.
-
Simplicity: Passing messages to other processes with a single
method call.
Note: I removed all the XML Comments of the code samples in the article to
save space - check out the attached source code if you need more details about
the methods and their arguments.
1. Starting with a Sample
To demonstrate how simple inter-process message passing could be, I will start
with a small sample: A console application that can be started as either a
reader or a writer, depending on the command line arguments. In a writer
process you may enter some text and send it to a mailbox (return key), a reader
displays all messages received from the mailbox. You may start as many writers
and readers as you want, but every message will only be shown on exactly one
reader.
[Serializable]
struct Message
{
public string Text;
}
class Test
{
IMailBox mail;
public Test()
{
mail = new ProcessMailBox("TMProcessTest",1024);
}
public void RunWriter()
{
Console.WriteLine("Writer started");
Message msg;
while(true)
{
msg.Text = Console.ReadLine();
if(msg.Text.Equals("exit"))
break;
mail.Content = msg;
}
}
public void RunReader()
{
Console.WriteLine("Reader started");
while(true)
{
Message msg = (Message)mail.Content;
Console.WriteLine(msg.Text);
}
}
[STAThread]
static void Main(string[] args)
{
Test test = new Test();
if(args.Length > 0)
test.RunWriter();
else
test.RunReader();
}
}
Once a mailbox (here: ProcessMailBox
) is created using the
constructor, receiving a message is as easy as getting the Content property,
sending as easy as setting it. The getter blocks the thread if no data is
available, and the setter blocks if there's already some content available not
requested by the reader. Thanks to this blocking, the application is completely
interrupt-driven and does not stress the CPU in any way (no polling or
spinning). Any type of messages/objects is allowed as long as they are
decorated with the SerializableAttribute
.
However, what happens behind the scenes is a bit more complex: The messages
are transferred through one of the only remaining ways of sharing memory
between processes: memory mapped files (MMF), in our case virtual files
existing only in the system page file. Access to this file is synchronized
using two Win32 semaphores. The messages are binary serialized written to the
file, that's why the SerializableAttribute
is needed. Both MMF and
Win32 Semaphores require direct NT Kernel Syscalls, but no unsafe code is
needed thanks to the handy Marshal class provided by the .NET Framework. More
details are discussed later in this article.
2. Inter-Thread and Inter-Process Synchronization in the .NET World
Communication between threads and processes requires either shared memory or a
built-in mechanism for transferring data into and out of the process/thread. In
the case of shared memory, there is also a set of synchronization primitives
needed to allow concurrent access.
All threads in a single process share a common logical address space (the
heap), but starting from Windows 2000 there's no way to share memory between
processes. However, processes are allowed to read and write to the same file
and the WinAPI provides various syscalls to simplify mapping files to the
process' address space and to work with virtual files existing only as kernel
objects ("sections") pointing to a memory block in the system page file. For
both inter-thread shared heaps and inter-process shared files, concurrent
access may result in data inconsistency. We discuss in short several mechanisms
to ensure the orderly execution of cooperating processes or threads allowing
data consistency to be maintained.
2.1 Thread Synchronization
The .NET Framework and C# offer very simple and straightforward thread
synchronization mechanisms using Monitor
s and the lock
-statement
(we will not discuss the .NET Framework's Mutex
Type in this
article). Using lock
is the recommended way to go for thread
synchronization, no matter what other primitives this article provides.
void Work1()
{
NonCriticalSection1();
Monitor.Enter(this);
try
{
CriticalSection();
}
finally
{
Monitor.Exit(this);
}
NonCriticalSection2();
}
void Work2()
{
NonCriticalSection1();
lock(this)
{
CriticalSection();
}
NonCriticalSection2();
}
Both Work1
and Work2
are equivalent. In C#, the second example is favorable
because it is shorter and less error-prone.
2.2 Inter-Thread Semaphores
One of the classic synchronization primitives (introduced by Edsger Dijkstra)
is the counting semaphore. Semaphores are objects with a counter and two
operations: Acquire
(also called 'P' or 'Wait') and Release
(also called 'V' or 'Signal'). A semaphore decrements the counter and blocks
(optional timeout) on Acquire
if the counter is zero (before
decrementing), while it increments the counter on Release
without
blocking. Although semaphores are simple structures, they are somewhat tricky
to implement. Fortunately, the blocking behavior of the built-in Monitor
helps.
public sealed class ThreadSemaphore : ISemaphore
{
private int counter;
private readonly int max;
public ThreadSemaphore() : this(0, int.Max) {}
public ThreadSemaphore(int initial) : this(initial, int.Max) {}
public ThreadSemaphore(int initial, int max)
{
this.counter = Math.Min(initial,max);
this.max = max;
}
public void Acquire()
{
lock(this)
{
counter--;
if(counter < 0 && !Monitor.Wait(this))
throw new SemaphoreFailedException();
}
}
public void Acquire(TimeSpan timeout)
{
lock(this)
{
counter--;
if(counter < 0 && !Monitor.Wait(this,timeout))
throw new SemaphoreFailedException();
}
}
public void Release()
{
lock(this)
{
if(counter >= max)
throw new SemaphoreFailedException();
if(counter < 0)
Monitor.Pulse(this);
counter++;
}
}
}
Semaphores are useful for more complex blocking scenarios like channels we
will discuss later. You could also use semaphores for mutual exclusion locking
of critical sections (Work3
, see below), but it's highly
recommended to use the built-in lock-keyword instead as demonstrated above in
Work2
.
Please note that counting semaphores are potentially dangerous objects if not
used carefully. You are on the safe side however if you follow a basic rule: Never
call Release
if Acquire
fails, but call it if it succeeds
whatever happens after the call. The finally statement in Work3
is one
way to enforce this rule, but note that the Acquire
call has to be
outside of the try statement as Release
must not be called if
Acquire
fails.
ThreadSemaphore s = new ThreadSemaphore(1);
void Work3()
{
NonCriticalSection1();
s.Acquire();
try
{
CriticalSection();
}
finally
{
s.Release();
}
NonCriticalSection2();
}
2.3 Inter-Process Semaphores
To synchronize resource access between processes we need primitives like the
ones discussed above for the process level. Unfortunately, there is no process
level Monitor class available in the .NET Framework. However, the Win32 Api
provides Semaphore Kernel Objects that may be used to synchronize access
between processes. Robin Galloway-Lunn introduces how to map Win32 semaphores
to the .NET world in "Using
Win32 Semaphores in C#". Our implementation looks similar:
[DllImport("kernel32",EntryPoint="CreateSemaphore",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern uint CreateSemaphore(
SecurityAttributes auth, int initialCount,
int maximumCount, string name);
[DllImport("kernel32",EntryPoint="WaitForSingleObject",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern uint WaitForSingleObject(
uint hHandle, uint dwMilliseconds);
[DllImport("kernel32",EntryPoint="ReleaseSemaphore",
SetLastError=true,CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool ReleaseSemaphore(
uint hHandle, int lReleaseCount, out int lpPreviousCount);
[DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true,
CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool CloseHandle(uint hHandle);
public class ProcessSemaphore : ISemaphore, IDisposable
{
private uint handle;
private readonly uint interruptReactionTime;
public ProcessSemaphore(string name) : this(
name,0,int.MaxValue,500) {}
public ProcessSemaphore(string name, int initial) : this(
name,initial,int.MaxValue,500) {}
public ProcessSemaphore(string name, int initial,
int max, int interruptReactionTime)
{
this.interruptReactionTime = (uint)interruptReactionTime;
this.handle = NTKernel.CreateSemaphore(null, initial, max, name);
if(handle == 0)
throw new SemaphoreFailedException();
}
public void Acquire()
{
while(true)
{
uint res = NTKernel.WaitForSingleObject(handle,
interruptReactionTime);
try {System.Threading.Thread.Sleep(0);}
catch(System.Threading.ThreadInterruptedException e)
{
if(res == 0)
{
int previousCount;
NTKernel.ReleaseSemaphore(handle,1,out previousCount);
}
throw e;
}
if(res == 0)
return;
if(res != 258)
throw new SemaphoreFailedException();
}
}
public void Acquire(TimeSpan timeout)
{
uint milliseconds = (uint)timeout.TotalMilliseconds;
if(NTKernel.WaitForSingleObject(handle, milliseconds) != 0)
throw new SemaphoreFailedException();
}
public void Release()
{
int previousCount;
if(!NTKernel.ReleaseSemaphore(handle, 1, out previousCount))
throw new SemaphoreFailedException();
}
#region IDisposable Member
public void Dispose()
{
if(handle != 0)
{
if(NTKernel.CloseHandle(handle))
handle = 0;
}
}
#endregion
}
The important point is that the semaphore is named. This allows other processes
to create a handle to the same semaphore just by entering the same name. To
make blocked threads interruptible we use a (dirty) workaround using timeouts
and Sleep(0). We need interrupt support to safely shutdown the threads. It's
recommended however to release the semaphore until no more thread is blocked,
allowing a clean application exit.
You may also have noticed that both the inter-thread and the inter-process
semaphore share the same interface. This pattern is achieved on all classes,
leading to the abstraction mentioned in the introduction. Note however that for
performance reasons you should NOT use inter-process implementations for
inter-thread scenarios or inter-thread implementations for single thread
scenarios.
3. Inter-Process Shared Memory: Memory Mapped Files
We have seen how to synchronize access to shared resources for both threads
and processes. What is missing for transferring messages is the shared resource
itself. For threads this is just as easy as declaring a class member variable,
but for processes we need a technique called Memory Mapped Files (MMF) provided
by the Win32 API. Working with MMF is not much harder than working with the
Win32 Semaphores discussed above. What we need first is a handle of such a
mapped file using the CreateFileMapping
Syscall:
[DllImport("Kernel32.dll",EntryPoint="CreateFileMapping",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern IntPtr CreateFileMapping(uint hFile,
SecurityAttributes lpAttributes, uint flProtect,
uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName);
[DllImport("Kernel32.dll",EntryPoint="MapViewOfFile",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject,
uint dwDesiredAccess, uint dwFileOffsetHigh,
uint dwFileOffsetLow, uint dwNumberOfBytesToMap);
[DllImport("Kernel32.dll",EntryPoint="UnmapViewOfFile",
SetLastError=true,CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress);
public static MemoryMappedFile CreateFile(string name,
FileAccess access, int size)
{
if(size < 0)
throw new ArgumentException("Size must not be negative","size");
IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,null,
(uint)access,0,(uint)size,name);
if(fileMapping == IntPtr.Zero)
throw new MemoryMappingFailedException();
return new MemoryMappedFile(fileMapping,size,access);
}
We prefer virtual files directly in the system page file, so we provide -1
(0xFFFFFFFF) as the file handle to create our mapped file handle. We also
specify the required file size in bytes and a name to allow other
processes to access the same file concurrently. Having such a file,
we may map several parts (specified by offset and size in bytes) of this file
to our local address space. We do this with the MapViewOfFile
Syscall:
public MemoryMappedFileView CreateView(int offset, int size,
MemoryMappedFileView.ViewAccess access)
{
if(this.access == FileAccess.ReadOnly && access ==
MemoryMappedFileView.ViewAccess.ReadWrite)
throw new ArgumentException(
"Only read access to views allowed on files without write access",
"access");
if(offset < 0)
throw new ArgumentException("Offset must not be negative","size");
if(size < 0)
throw new ArgumentException("Size must not be negative","size");
IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping,
(uint)access,0,(uint)offset,(uint)size);
return new MemoryMappedFileView(mappedView,size,access);
}
In unsafe code, we could just take the returned pointer (mappedView
)
and cast it to our data structures. Nevertheless, as we do not want unsafe code
we use the Marshal
class to read and write bytes and integers to
it. The offset parameter is used to specify where to start reading or writing
to the mapped file, relative to the view offset.
public byte ReadByte(int offset)
{
return Marshal.ReadByte(mappedView,offset);
}
public void WriteByte(byte data, int offset)
{
Marshal.WriteByte(mappedView,offset,data);
}
public int ReadInt32(int offset)
{
return Marshal.ReadInt32(mappedView,offset);
}
public void WriteInt32(int data, int offset)
{
Marshal.WriteInt32(mappedView,offset,data);
}
public void ReadBytes(byte[] data, int offset)
{
for(int i=0;i<data.Length;i++)
data[i] = Marshal.ReadByte(mappedView,offset+i);
}
public void WriteBytes(byte[] data, int offset)
{
for(int i=0;i<data.Length;i++)
Marshal.WriteByte(mappedView,offset+i,data[i]);
}
However, we want to write and read whole object trees to the file, so we need
more advanced accessors with automatic binary serialization support:
public object ReadDeserialize(int offset, int length)
{
byte[] binaryData = new byte[length];
ReadBytes(binaryData,offset);
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter
= new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
System.IO.MemoryStream ms = new System.IO.MemoryStream(
binaryData,0,length,true,true);
object data = formatter.Deserialize(ms);
ms.Close();
return data;
}
public void WriteSerialize(object data, int offset, int length)
{
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter
= new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
byte[] binaryData = new byte[length];
System.IO.MemoryStream ms = new System.IO.MemoryStream(
binaryData,0,length,true,true);
formatter.Serialize(ms,data);
ms.Flush();
ms.Close();
WriteBytes(binaryData,offset);
}
Please note that serialized size of the object should not exceed the view
size. The serialized size is always bigger than the size of the object itself.
I have not tried binding the memory stream directly to the mapped view instead
of the byte array but that should work too, probably even with a small
performance gain.
4. Mailbox: Passing Messages between Threads and Processes
A mailbox has nothing to do with neither email nor NT Mailslots. It is a safe
shared memory structure that can hold only one object. The content is read and
written through a property. If the mailbox does not hold an object, a thread
reading the content is blocked until another thread writes some content. If it
already holds content, a thread trying to write to it is blocked until another
thread reads the content first. The content can only be read once - its
reference is automatically removed after reading. We've developed above all we
need to build such mailboxes.
4.1 Inter-Thread Mailbox
A mailbox is very easy to build using two semaphores: one is signaled when the
box is empty, the other when it is full. To read from the mailbox one first
waits until the mailbox is full and signals the empty semaphore after reading.
To write one needs to wait until it's empty and signals the full semaphore
after writing. Note that the empty semaphore is signaled at the beginning.
public sealed class ThreadMailBox : IMailBox
{
private object content;
private ThreadSemaphore empty, full;
public ThreadMailBox()
{
empty = new ThreadSemaphore(1,1);
full = new ThreadSemaphore(0,1);
}
public object Content
{
get
{
full.Acquire();
object item = content;
empty.Release();
return item;
}
set
{
empty.Acquire();
content = value;
full.Release();
}
}
}
4.2 Inter-Process Mailbox
The inter-process version is nearly as simple as the inter-thread
implementation. The only difference is that we now use inter-process semaphores
and that we read and write to a memory mapped file instead of a simple class
member variable. As serialization could fail, we provide a small rollback
exception handler to undo any changes made to the mailbox state. There are many
possible error sources (invalid handles, access denied, file size,
SerializableAttribute
missing ...).
public sealed class ProcessMailBox : IMailBox, IDisposable
{
private MemoryMappedFile file;
private MemoryMappedFileView view;
private ProcessSemaphore empty, full;
public ProcessMailBox(string name,int size)
{
empty = new ProcessSemaphore(name+".EmptySemaphore.MailBox",1,1);
full = new ProcessSemaphore(name+".FullSemaphore.MailBox",0,1);
file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.MailBox",
MemoryMappedFile.FileAccess.ReadWrite,size);
view = file.CreateView(0,size,
MemoryMappedFileView.ViewAccess.ReadWrite);
}
public object Content
{
get
{
full.Acquire();
object item;
try {item = view.ReadDeserialize();}
catch(Exception e)
{
full.Release();
throw e;
}
empty.Release();
return item;
}
set
{
empty.Acquire();
try {view.WriteSerialize(value);}
catch(Exception e)
{
empty.Release();
throw e;
}
full.Release();
}
}
#region IDisposable Member
public void Dispose()
{
view.Dispose();
file.Dispose();
empty.Dispose();
full.Dispose();
}
#endregion
}
Now we have all the tools needed for the IPC message-passing sample in the
beginning of the article. You may want to scroll back to the sample as it
demonstrates how the ProcessMailBox
could be used.
5. Channels: Queued Message Transfer
An important point of mailboxes is that they can hold only one object at once.
If a worker in a long processing chain (connected with mailboxes) needs a bit
more time than usual for a special command, the whole chain is blocked
immediately. Often it's more favorable to have buffered message-passing
channels where you can pick out incoming message whenever you've got time left
without blocking the sender. Such buffering is provided by channels, an
alternative to the simpler mailboxes. Again, we will discuss both an
inter-thread and an inter-process implementation.
5.1 Reliability
Another important difference between mailboxes and channels is that channels
have some reliability features and for example automatically dump messages
failed to send to the queue (because of a thread interrupt while waiting for a
lock) to an internal dump container. This means that channel-processing threads
can safely be shutdown without loosing any messages. This is maintained by two
abstract classes, ThreadReliability
and ProcessReliability
- every channel implementation derives from one of them.
5.2 Inter-Thread Channel
The inter-thread channel is based on the mailbox but uses a synchronized queue
instead of a variable as a message buffer. Thanks to the counting semaphore
model the channel blocks receiving if the queue is empty and blocks sending if
the queue is full. You cannot run into any enqueue/dequeue failures. We achieve
this by initializing the empty semaphore with the channel size and the full
semaphore with zero. If a thread sending a message is interrupted while being
blocked in the empty semaphore we copy the message to the dump container and
let the exception propagate. No dumping is required in the receive method as
you won't loose any message when being interrupted there. Note that a thread
can only be interrupted while being blocked, that is when calling Aquire() on a
semaphore.
public sealed class ThreadChannel : ThreadReliability, IChannel
{
private Queue queue;
private ThreadSemaphore empty, full;
public ThreadChannel(int size)
{
queue = Queue.Synchronized(new Queue(size));
empty = new ThreadSemaphore(size,size);
full = new ThreadSemaphore(0,size);
}
public void Send(object item)
{
try {empty.Acquire();}
catch(System.Threading.ThreadInterruptedException e)
{
DumpItem(item);
throw e;
}
queue.Enqueue(item);
full.Release();
}
public void Send(object item, TimeSpan timeout)
{
try {empty.Acquire(timeout);}
...
}
public object Receive()
{
full.Acquire();
object item = queue.Dequeue();
empty.Release();
return item;
}
public object Receive(TimeSpan timeout)
{
full.Acquire(timeout);
...
}
protected override void DumpStructure()
{
lock(queue.SyncRoot)
{
foreach(object item in queue)
DumpItem(item);
queue.Clear();
}
}
}
5.3 Inter-Process Channel
Building an inter-process channel is a bit harder as you first need a way to
provide a buffer. Possible solutions could be to use an inter-process mailbox
and queue the send or receive methods depending on the required behavior. To
avoid several drawbacks of this solution we'll implement a queue directly in
the memory-mapped file instead. A first class, MemoryMappedArray
,
splits the file in several pieces accessible using an index. A second class,
MemoryMappedQueue
, builds a classic ring buffer around this array (see
attached source code for more details). To allow both direct byte/integer
access and binary serialization, one first calls Enqueue/Dequeue and then uses
the write/read methods as needed (the queue puts them automatically to the
right place). Both Array and Queue implementations are neither thread- nor
process-safe so we have to control mutual exclusive access using a
mutex-emulating inter-process semaphore (we could also have wrapped a Win32
Mutex). Beside of these two classes, the channel implementation is mostly the
same as the inter-process mailbox. Again we have to take care of thread
interrupts in the Send() methods as well as possible serialization failures.
public sealed class ProcessChannel : ProcessReliability, IChannel, IDisposable
{
private MemoryMappedFile file;
private MemoryMappedFileView view;
private MemoryMappedQueue queue;
private ProcessSemaphore empty, full, mutex;
public ProcessChannel( int size, string name, int maxBytesPerEntry)
{
int fileSize = 64+size*maxBytesPerEntry;
empty = new ProcessSemaphore(name+".EmptySemaphore.Channel",size,size);
full = new ProcessSemaphore(name+".FullSemaphore.Channel",0,size);
mutex = new ProcessSemaphore(name+".MutexSemaphore.Channel",1,1);
file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.Channel",
MemoryMappedFile.FileAccess.ReadWrite,fileSize);
view = file.CreateView(0,fileSize,
MemoryMappedFileView.ViewAccess.ReadWrite);
queue = new MemoryMappedQueue(view,size,maxBytesPerEntry,true,0);
if(queue.Length < size || queue.BytesPerEntry < maxBytesPerEntry)
throw new MemoryMappedArrayFailedException();
}
public void Send(object item)
{
try {empty.Acquire();}
catch(System.Threading.ThreadInterruptedException e)
{
DumpItemSynchronized(item);
throw e;
}
try {mutex.Acquire();}
catch(System.Threading.ThreadInterruptedException e)
{
DumpItemSynchronized(item);
empty.Release();
throw e;
}
queue.Enqueue();
try {queue.WriteSerialize(item,0);}
catch(Exception e)
{
queue.RollbackEnqueue();
mutex.Release();
empty.Release();
throw e;
}
mutex.Release();
full.Release();
}
public void Send(object item, TimeSpan timeout)
{
try {empty.Acquire(timeout);}
...
}
public object Receive()
{
full.Acquire();
mutex.Acquire();
object item;
queue.Dequeue();
try {item = queue.ReadDeserialize(0);}
catch(Exception e)
{
queue.RollbackDequeue();
mutex.Release();
full.Release();
throw e;
}
mutex.Release();
empty.Release();
return item;
}
public object Receive(TimeSpan timeout)
{
full.Acquire(timeout);
...
}
protected override void DumpStructure()
{
mutex.Acquire();
byte[][] dmp = queue.DumpClearAll();
for(int i=0;i<dmp.Length;i++)
DumpItemSynchronized(dmp[i]);
mutex.Release();
}
#region IDisposable Member
public void Dispose()
{
view.Dispose();
file.Dispose();
empty.Dispose();
full.Dispose();
mutex.Dispose();
}
#endregion
}
6. Message Routing
We've discussed how to synchronize resource access and how to pass messages
between threads and processes using mailboxes or channels. When working with
blocking channels, you may run into problems e.g. when you need to listen to
more than one channel in the same thread. To solve such situations, there are
some small class modules available: A channel forwarder, a multiplexer and
demultiplexer and a channel event gateway. You may define your own channel
processors the same way using the simple IRunnable
pattern
provided by the two abstract classes SingleRunnable
and MultiRunnable
(check out the attached source code for more details).
6.1 Channel Forwarder
A channel forwarder does nothing more but listening on a channel and
forwarding received messages to another channel. If needed, the forwarder may
put each received message into an envelope marked with a constant number before
forwarding (this feature is used in the multiplexer, see below).
public class ChannelForwarder : SingleRunnable
{
private IChannel source, target;
private readonly int envelope;
public ChannelForwarder(IChannel source,
IChannel target, bool autoStart, bool waitOnStop)
: base(true,autoStart,waitOnStop)
{
this.source = source;
this.target = target;
this.envelope = -1;
}
public ChannelForwarder(IChannel source, IChannel target,
int envelope, bool autoStart, bool waitOnStop)
: base(true,autoStart,waitOnStop)
{
this.source = source;
this.target = target;
this.envelope = envelope;
}
protected override void Run()
{
if(envelope == -1)
while(running)
target.Send(source.Receive());
else
{
MessageEnvelope env;
env.ID = envelope;
while(running)
{
env.Message = source.Receive();
target.Send(env);
}
}
}
}
6.2 Channel Multiplexer and Demultiplexer
A multiplexer listens on several input channels and forwards every received
message (in an envelope to identify the input channel) to a common output
channel. This may be used to listen to multiple channels at once. A
demultiplexer on the other hand listens on a common input channel and forwards
them to one of several output channels depending on the message envelope.
public class ChannelMultiplexer : MultiRunnable
{
private ChannelForwarder[] forwarders;
public ChannelMultiplexer(IChannel[] channels, int[] ids,
IChannel output, bool autoStart, bool waitOnStop)
{
int count = channels.Length;
if(count != ids.Length)
throw new ArgumentException("Channel and ID count mismatch.","ids");
forwarders = new ChannelForwarder[count];
for(int i=0;i<count;i++)
forwarders[i] = new ChannelForwarder(channels[i],
output,ids[i],autoStart,waitOnStop);
SetRunnables((SingleRunnable[])forwarders);
}
}
public class ChannelDemultiplexer : SingleRunnable
{
private HybridDictionary dictionary;
private IChannel input;
public ChannelDemultiplexer(IChannel[] channels, int[] ids,
IChannel input, bool autoStart, bool waitOnStop)
: base(true,autoStart,waitOnStop)
{
this.input = input;
int count = channels.Length;
if(count != ids.Length)
throw new ArgumentException("Channel and ID count mismatch.","ids");
dictionary = new HybridDictionary(count,true);
for(int i=0;i<count;i++)
dictionary.add(ids[i],channels[i]);
}
protected override void Run()
{
while(running)
{
MessageEnvelope env = (MessageEnvelope)input.Receive();
IChannel channel = (IChannel)dictionary[env.ID];
channel.send(env.Message);
}
}
}
6.3 Channel Event Gateway
The channel event gateway receives messages from a channel and fires an event
for each message received. This class may be useful for event-oriented
applications like GUIs, or to initialize minor activities using the system
ThreadPool
. Remember however that in case of WinForms you can't access any UI
controls directly from the event handler but have to use Invoke()
instead, as
the handler is executed in the gateway thread, not in the UI thread.
public class ChannelEventGateway : SingleRunnable
{
private IChannel source;
public event MessageReceivedEventHandler MessageReceived;
public ChannelEventGateway(IChannel source, bool autoStart,
bool waitOnStop) : base(true,autoStart,waitOnStop)
{
this.source = source;
}
protected override void Run()
{
while(running)
{
object c = source.Receive();
MessageReceivedEventHandler handler = MessageReceived;
if(handler != null)
handler(this,new MessageReceivedEventArgs(c));
}
}
}
7. The Pizza Drive-in Demo
That's it; we have discussed the most important structures and techniques of
the framework (others classes like the Rendezvous and Barrier implementations
are ignored in this article). We end this article the same way we began it:
with a demonstration. This time we have a look at a small pizza drive-in
simulation. The screen shot at the top of this page shows this simulation in
action: four parallel processes talking to each other. The diagram below shows
how data/messages flow between the four processes using inter-process channels
and inside of the processes using faster inter-thread channels and mailboxes.
To set the ball rolling, a customer orders a pizza and something to drink. He
does this with a method call in the customer interface that posts an Order
message to the CustomerOrders
channel. The order taker, always
waiting for new orders, posts two cook instructions (one for the pizza and one
for the drink) to the CookInstruction
channel. The same time he
forwards the order to the cashier using the CashierOrder
queue.
The cashier asks the pricing server for an adequate price and sends an Invoice
to the customer in hope of a fast Payment
reply. In the meantime,
the cooks will have noticed the cook instructions and sent the finished items
to the packer. The packer waits until an order is complete and forwards the
packet to the customer.
To run this demo, open 4 command shells (cmd.exe) and start as many cooks as
you want using "PizzaDemo.exe cook", the backend using "PizzaDemo.exe backend"
and the facade process with the customer interface using "PizzaDemo.exe facade"
(replace 'PizzaDemo' with the name of your assembly). Note that some threads
(like the cooks) sleep some seconds each time to increase reality. Press return
to stop and exit a process. If you press enter while still in action you'll see
that some messages are dumped in the dump report at the end. In a real world
application, the dump container would be stored to disk.
The demo uses several mechanisms introduced in this article. For example, the
cashier loads a ChannelMultiplexer
to listen to both customer
payments and orders and uses two mailboxes for the pricing service. The
shipment gateway is a ChannelEventGateway
, allowing the customer
to be notified with an event as soon as a food package is ready. All the
processes should also work as Windows NT Services and on Terminal Server
installations.
8. Conclusion
We've discussed how service oriented architecture could be built and how
inter-process communication and synchronization could be implemented in C#.
However, this is not the only solution for such problems. For example, using
that many threads could be a serious problem in bigger projects. Completely
missing are transaction support and alternative channel/mailbox implementations
like named pipes and TCP sockets. There may also be some flaws in the
architecture, please let me know.
9. References
10. History
-
July 28, 2004: Initial Article
-
August 04, 2004:
ThreadSemaphore
now allows checking for illegal releases.
If such a release is detected an exception is thrown that usually indicates
a serious (but sometimes hardly noticeable) programming error. All structures
in the article now use checking. I also added a small comment in section 2.2
to point out correct semaphore handling and updated both downloads.