Introduction
In this article I describe how I created a server and client implementation using named pipes in C# .Net 4.
I used NamedPipeServerStream and NamedPipeClientStream, but soon realized that the name "server" was confusing. NamedPipeServerStream could only handle one client at a time (see Named Pipe Instances topic in MSDN), but I needed a server that could handle multiple clients requests.
I could not find any online example suitable to my needs, therefore, I created my own implementation of a server using NamedPipeServerStream and NamedPipeClientStream.
Background
As defined by Microsoft, A named pipe is a named, one-way or duplex pipe for communication between the pipe server and one or more pipe clients. All instances of a named pipe share the same pipe name, but each instance has its own buffers and handles, and provides a separate conduit for client/server communication. The use of instances enables multiple pipe clients to use the same named pipe simultaneously. Any process can access named pipes, subject to security checks, making named pipes an easy form of communication between related or unrelated processes. More...
Download Code
https://github.com/IfatChitin/Named-Pipes
Introduction to the code
PipeServer is in charge of creating and holding the named pipe streams, which are opened for each client.
InternalPipeServer is a wrapper for NamedPipeServerStream.
PipeClient is a wrapper for NamedPipeClientStream.
Main flows:
- PipeServer is created and started
- A new pipe name is generated.
- A new instance of InternalPipeServer is created and begins waiting for client connections.
- PipeClient is created and started
- A connection is made with InternalPipeServer.
- InternalPipeServer fires an event to let PipeServer know a connection was made.
- PipeServer fires its own event, to let the world know that a client has connected. It then creates a new instance of InternalPipeServer and starts it so that it will begin waiting for new connections, while the first instance communicates with the first client.
- InternalPipeServer begins an asynchronous read operation which completes when a client has sent a message, has been disconnected or when the pipe has been closed.
- PipeClient sends a message
- InternalPipeServer receives part of the message since the message is longer than its buffer size, and initiates a new asynchronous read operation.
- InternalPipeServer receives the rest of the message, appends it to the first parts, fires an event to let PipeServer know a new message has arrived, and initiates a new asynchronous read operation to wait for new messages.
- PipeServer fires its own event to let the world know a new message has arrived from one of the clients.
- PipeClient disconnects
- InternalPipeServer's read operation ends with no bytes read, so InternalPipeServer assumes the client has disconnected. It fires an event to let PipeServer know about it.
- PipeServer fires its own event to let the world know a client has been disconnected.
- PipeServer is stopped
- PipeServer stops all its InternalPipeServer instances
Using the code
If you need to communicate with another process, use the attached code.
Create a PipeServer in one process and a PipeClient in another. Then use PipeClient in order to send messages to the server.
InternalPipeServer
InternalPipeServer c'tor
NamedPipeServerStream is created in the InternalPipeServer's constructor.
The arguments passed to the NamedPipeServerStream constructor are:
pipeName
: The name of the pipe that should be created. The client must be familiar with this name in order to connect to the pipe server. PipeDirection.InOut
: The pipe direction. maxNumberOfServerInstances
: The maximum number of server instances that share the same name. An I/O exception will be thrown when creating a NamedPipeServerStream if its creation reaches the max number. PipeTransmissionMode.Message
: The pipe transmission mode. I chose message since it exposes a helpful flag "IsMessageCompleted" which helps in the communication implementation over the stream. In order to use Message transmission mode, the pipe direction must be InOut. PipeOptions.Asynchronous
: This enables the async read and write operations.
public InternalPipeServer(string pipeName, int maxNumberOfServerInstances)
{
_pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.InOut, maxNumberOfServerInstances, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
Id = Guid.NewGuid().ToString();
}
Start
BeginWaitForConnection
receives a callback to be called when the operation is completed and a user-defined object that will be passed to the callback. In this case, we send null.
public void Start()
{
try
{
_pipeServer.BeginWaitForConnection(WaitForConnectionCallBack, null);
}
catch (Exception ex)
{
Logger.Error(ex);
throw;
}
}
WaitForConnectionCallBack
WaitForConnectionCallBack
is called once a client has connected, but not only then. It is called also when the server is closed.
EndWaitForConnection
must be called for each BeginWaitForConnection
, with the returned asyncResult. - In order to avoid the exeption that will be thrown in case we call
EndWaitForConnection
on a closed server, we are checking the _isStopping flag, which we set to True when closing the server. - All the operations in this class are asynchronous, therefore a lock is used before reading the flag.
- However, since locking reduces the performance, we check the flag before the lock. We then check it again after the lock since it might have been changed between the previous check and the lock.
private void WaitForConnectionCallBack(IAsyncResult result)
{
if (!_isStopping)
{
lock (_lockingObject)
{
if (!_isStopping)
{
_pipeServer.EndWaitForConnection(result);
OnConnected();
BeginRead(new Info());
}
}
}
}
OnConnected
Fires the event ClientConnectedEvent to any listeners that have subscribed to it.
private void OnConnected()
{
if (ClientConnectedEvent != null)
{
ClientConnectedEvent(this, new ClientConnectedEventArgs { ClientId = Id });
}
}
Info
The Info
class holds the message information. A new instance of Info was created in WaitForConnectionCallBack
before calling BeginRead
private class Info
{
public readonly byte[] Buffer;
public readonly StringBuilder StringBuilder;
public Info()
{
Buffer = new byte[BufferSize];
StringBuilder = new StringBuilder();
}
}
BeginRead
BeginRead
receives the following parameters:
buffer:
The buffer to read data into. offset:
The byte offset in buffer at which to begin reading. count:
The maximum number of bytes to read. callback:
The method to call when the asynchronous read operation is completed. state:
A user-provided object that distinguishes this particular asynchronous read request from other requests.
The given Info
is sent as the state to EndReadCallBack
.
private void BeginRead(Info info)
{
try
{
_pipeServer.BeginRead(info.Buffer, 0, BufferSize, EndReadCallBack, info);
}
catch (Exception ex)
{
Logger.Error(ex);
throw;
}
}
EndReadCallBack
EndReadCallBack
is a very interesting method.
It is called when the BeginRead
operation completes, which isn't necessarily only when the client writes a message to the pipe, but also when the client has been disconnected or the server has been closed.
- The call to
EndRead
returns the number of read bytes. - If bytes were read (not zero) they will be extracted from the given info and the info's string builder will append the message.
- If the mesaage is not complete, another read operation is initiated by calling
BeginRead
with the same info. - If the message is complete,
MessageReceivedEvent
is fired and a new read operation is initiated with a fresh new info.
- If no bytes were read, it probably means the client has been disconnected or the server is closed. The
ClienticonnectedEvent
will be fired and the InternalPipeServer will be stopped.
private void EndReadCallBack(IAsyncResult result)
{
var readBytes = _pipeServer.EndRead(result);
if (readBytes > 0)
{
var info = (Info)result.AsyncState;
info.StringBuilder.Append(Encoding.UTF8.GetString(info.Buffer, 0, readBytes));
if (!_pipeServer.IsMessageComplete)
{
BeginRead(info);
}
else
{
var message = info.StringBuilder.ToString().TrimEnd('\0');
OnMessageReceived(message);
BeginRead(new Info());
}
}
else
{
if (!_isStopping)
{
lock (_lockingObject)
{
if (!_isStopping)
{
OnDisconnected();
Stop();
}
}
}
}
}
OnMessageReceived
private void OnMessageReceived(string message)
{
if (MessageReceivedEvent != null)
{
MessageReceivedEvent(this,
new MessageReceivedEventArgs
{
Message = message
});
}
}
OnDisconnected
private void OnDisconnected()
{
if (ClientDisconnectedEvent != null)
{
ClientDisconnectedEvent(this, new ClientDisconnectedEventArgs { ClientId = Id });
}
}
Stop
public void Stop()
{
_isStopping = true;
try
{
if (_pipeServer.IsConnected)
{
_pipeServer.Disconnect();
}
}
catch (Exception ex)
{
Logger.Error(ex);
throw;
}
finally
{
_pipeServer.Close();
_pipeServer.Dispose();
}
}
PipeClient
PipeClient c'tor
NamedPipeClientStream is created in the PipeClient's constructor. The given serverId is the pipe name to connect to.
public PipeClient(string serverId)
{
_pipeClient = new NamedPipeClientStream(".", serverId, PipeDirection.InOut, PipeOptions.Asynchronous);
}
Start
Initiates a connection to the server within the given timeout. If not connected within this time, an exception will be thrown.
public void Start()
{
const int tryConnectTimeout = 5 * 60 * 1000;
_pipeClient.Connect(tryConnectTimeout);
}
SendMessage
Once connected, the client can start sending its messages over the pipe.
The SendMessage
method returns a task, which will contain the asynchronous result or exception.
BeginWrite
receives the following arguments:
buffer:
The buffer that contains the data to write to the current stream. offset:
The zero-based byte offset in buffer at which to begin copying bytes to the current stream. count:
The maximum number of bytes to write. callback:
The method to call when the asynchronous write operation is completed. state:
A user-provided object that distinguishes this particular asynchronous write request from other requests.
- The callback we use is an anonymous function which sets the result of
EndWriteCallBack
to the taskCompletionSource, or sets an exception to it. - The task of taskCompletionSource is the return value of the SendMessage method.
public Task<TaskResult> SendMessage(string message)
{
var taskCompletionSource = new TaskCompletionSource<TaskResult>();
if (_pipeClient.IsConnected)
{
var buffer = Encoding.UTF8.GetBytes(message);
_pipeClient.BeginWrite(buffer, 0, buffer.Length, asyncResult =>
{
try
{
taskCompletionSource.SetResult(EndWriteCallBack(asyncResult));
}
catch (Exception ex)
{
taskCompletionSource.SetException(ex);
}
}, null);
}
else
{
Logger.Error("Cannot send message, pipe is not connected");
throw new IOException("pipe is not connected");
}
return taskCompletionSource.Task;
}
TaskResult
public class TaskResult
{
public bool IsSuccess { get; set; }
public string ErrorMessage { get; set; }
}
EndWriteCallBack
EndWriteCallBack
calls EndWrite
and flushes the pipe. It is important to call EndMethod
with the result that its corresponding BeginMethod
has returned.
private TaskResult EndWriteCallBack(IAsyncResult asyncResult)
{
_pipeClient.EndWrite(asyncResult);
_pipeClient.Flush();
return new TaskResult { IsSuccess = true };
}
Stop
public void Stop()
{
try
{
_pipeClient.WaitForPipeDrain();
}
finally
{
_pipeClient.Close();
_pipeClient.Dispose();
}
}
PipeServer
PipeServer c'tor
- A new guid is created and will serve as the pipe name.
- A synchronization context is defined using
AsyncOperationManager.
SynchronizationContext
which returns the correct synchronization context for all application models supported by the .NET framework. This mean we'll receive the correct context whether we work in WPF, WinForms, etc. More information can be found in this nice article by Gabriel Schenker.
Synchronization context is used a bit later.
public PipeServer()
{
_pipeName = Guid.NewGuid().ToString();
_synchronizationContext = AsyncOperationManager.SynchronizationContext;
_servers = new ConcurrentDictionary<string, ICommunicationServer>();
}
Start
public void Start()
{
StartNamedPipeServer();
}
StartNamedPipeServer
- A new
InternalPipeServer
is created with the pipe name and the amount of server instances that we allow, defined in the MaxNumberOfServerInstances
field. - The created server is stored in a thread safe dictionary. Its id serves as the key.
- Handlers are registered to the server events.
Start
is called in order to initiate the server
private void StartNamedPipeServer()
{
var server = new InternalPipeServer(_pipeName, MaxNumberOfServerInstances);
_servers[server.Id] = server;
server.ClientConnectedEvent += ClientConnectedHandler;
server.ClientDisconnectedEvent += ClientDisconnectedHandler;
server.MessageReceivedEvent += MessageReceivedHandler;
server.Start();
}
ClientConnectedHandler
When an InternalPipeServer fires the ClientConnectedEvent, this handler is called.
- OnClientConnected lets the world know about the received message.
- StartNamedPipeServer prepares for a new client connection.
private void ClientConnectedHandler(object sender, ClientConnectedEventArgs eventArgs)
{
OnClientConnected(eventArgs);
StartNamedPipeServer();
}
OnClientConnected
This method can be called from a working thread. Thus, before firing ClientConnectedEvent, we must synchronize the thread to the UI thread. If we don't do that, the handlers of the PipeServer listeners will be also called in a working thread, and then they will have to perform the synchronization before being able to access any UI control (for example in order to display a message).
The synchronization is done by calling _synchronizationContext.Post.
private void OnClientConnected(ClientConnectedEventArgs eventArgs)
{
_synchronizationContext.Post(e => ClientConnectedEvent.SafeInvoke(this, (ClientConnectedEventArgs)e), eventArgs);
}
SafeInvoke
We use a very nice pattern that is explained in this great answer to a great question
public static void SafeInvoke<T>(this EventHandler<T> @event, object sender, T eventArgs) where T : EventArgs
{
if (@event != null)
{
@event(sender, eventArgs);
}
}
MessageReceivedHandler
private void MessageReceivedHandler(object sender, MessageReceivedEventArgs eventArgs)
{
OnMessageReceived(eventArgs);
}
OnMessageReceived
private void OnMessageReceived(MessageReceivedEventArgs eventArgs)
{
_synchronizationContext.Post(e => MessageReceivedEvent.SafeInvoke(this, (MessageReceivedEventArgs)e), eventArgs);
}
ClientDisconnectedHandler
private void ClientDisconnectedHandler(object sender, ClientDisconnectedEventArgs eventArgs)
{
OnClientDisconnected(eventArgs);
StopNamedPipeServer(eventArgs.ClientId);
}
OnClientDisconnected
private void OnClientDisconnected(ClientDisconnectedEventArgs eventArgs)
{
_synchronizationContext.Post(e => ClientDisconnectedEvent.SafeInvoke(this, (ClientDisconnectedEventArgs)e), eventArgs);
}
StopNamedPipeServer
private void StopNamedPipeServer(string id)
{
UnregisterFromServerEvents(_servers[id]);
_servers[id].Stop();
_servers.Remove(id);
}
UnregisterFromServerEvents
private void UnregisterFromServerEvents(ICommunicationServer server)
{
server.ClientConnectedEvent -= ClientConnectedHandler;
server.ClientDisconnectedEvent -= ClientDisconnectedHandler;
server.MessageReceivedEvent -= MessageReceivedHandler;
}
Stop
public void Stop()
{
foreach (var server in _servers.Values)
{
try
{
UnregisterFromServerEvents(server);
server.Stop();
}
catch (Exception)
{
Logger.Error("Fialed to stop server");
}
}
_servers.Clear();
}
Usage
The attached solution includes unit tests which demonstrate the usage of the library server.
Points of Interest
Working on the client server implementation was very educating. As part of my research I had the opportunity to read some very interesting articles online, which I referenced throughout this article.
Debugging in a multithreaded environment was both challanging and refreshing.
I would like to thank...
I would like to thank my co-workers and outstanding software engineers who helped me with their ideas, code reviews and deep knowledge: Avishay Ben Shabtai, Niv Lederer and Amit Bezalel.
Thank you guys!