Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Creating a Server Using Named Pipes

0.00/5 (No votes)
31 Jan 2015 2  
In this article I describe how to create an inter-process communication library using named pipes.

Introduction

In this article I describe how I created a server and client implementation using named pipes in C# .Net 4.
I used NamedPipeServerStream and NamedPipeClientStream, but soon realized that the name "server" was confusing. NamedPipeServerStream could only handle one client at a time (see Named Pipe Instances topic in MSDN), but I needed a server that could handle multiple clients requests.
I could not find any online example suitable to my needs, therefore, I created my own implementation of a server using NamedPipeServerStream and NamedPipeClientStream.

Background

As defined by Microsoft, A named pipe is a named, one-way or duplex pipe for communication between the pipe server and one or more pipe clients. All instances of a named pipe share the same pipe name, but each instance has its own buffers and handles, and provides a separate conduit for client/server communication. The use of instances enables multiple pipe clients to use the same named pipe simultaneously. Any process can access named pipes, subject to security checks, making named pipes an easy form of communication between related or unrelated processes. More...

Download Code

https://github.com/IfatChitin/Named-Pipes

Introduction to the code

PipeServer is in charge of creating and holding the named pipe streams, which are opened for each client. 
InternalPipeServer is a wrapper for NamedPipeServerStream. 
PipeClient is a wrapper for NamedPipeClientStream.

Main flows:

  1. PipeServer is created and started
    • A new pipe name is generated.
    • A new instance of InternalPipeServer is created and begins waiting for client connections. 
  2. PipeClient is created and started
    • A connection is made with InternalPipeServer. 
    • InternalPipeServer fires an event to let PipeServer know a connection was made.
      • PipeServer fires its own event, to let the world know that a client has connected. It then creates a new instance of InternalPipeServer and starts it so that it will begin waiting for new connections, while the first instance communicates with the first client.
    • InternalPipeServer begins an asynchronous read operation which completes when a client has sent a message, has been disconnected or when the pipe has been closed.
  3. PipeClient sends a message 
    • InternalPipeServer receives part of the message since the message is longer than its buffer size, and initiates a new asynchronous read operation.
    • InternalPipeServer receives the rest of the message, appends it to the first parts, fires an event to let PipeServer know a new message has arrived, and initiates a new asynchronous read operation to wait for new messages.
      • PipeServer fires its own event to let the world know a new message has arrived from one of the clients. 
  4. PipeClient disconnects
    • InternalPipeServer's read operation ends with no bytes read, so InternalPipeServer assumes the client has disconnected. It fires an event to let PipeServer know about it.
    • PipeServer fires its own event to let the world know a client has been disconnected.
  5. PipeServer is stopped
    • PipeServer stops all its InternalPipeServer instances 

Using the code

If you need to communicate with another process, use the attached code. 
Create a PipeServer in one process and a PipeClient in another. Then use PipeClient in order to send messages to the server.

InternalPipeServer

InternalPipeServer c'tor

NamedPipeServerStream is created in the InternalPipeServer's constructor.
The arguments passed to the NamedPipeServerStream constructor are:

  • pipeName: The name of the pipe that should be created. The client must be familiar with this name in order to connect to the pipe server.
  • PipeDirection.InOut: The pipe direction. 
  • maxNumberOfServerInstances: The maximum number of server instances that share the same name. An I/O exception will be thrown when creating a NamedPipeServerStream if its creation reaches the max number. 
  • PipeTransmissionMode.Message: The pipe transmission mode. I chose message since it exposes a helpful flag "IsMessageCompleted" which helps in the communication implementation over the stream. In order to use Message transmission mode, the pipe direction must be InOut.
  • PipeOptions.Asynchronous: This enables the async read and write operations.
C#
/// <summary>
/// Creates a new NamedPipeServerStream 
/// </summary>
public InternalPipeServer(string pipeName, int maxNumberOfServerInstances)
{
    _pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.InOut, maxNumberOfServerInstances, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
    Id = Guid.NewGuid().ToString();
}

Start

BeginWaitForConnection receives a callback to be called when the operation is completed and a user-defined object that will be passed to the callback. In this case, we send null.

C#
/// <summary>
/// This method begins an asynchronous operation to wait for a client to connect.
/// </summary>
public void Start()
{
    try
    {
        _pipeServer.BeginWaitForConnection(WaitForConnectionCallBack, null);
    }
    catch (Exception ex)
    {
        Logger.Error(ex);
        throw;
    }
}

WaitForConnectionCallBack

WaitForConnectionCallBack is called once a client has connected, but not only then. It is called also when the server is closed.

  • EndWaitForConnection must be called for each BeginWaitForConnection, with the returned asyncResult.
  • In order to avoid the exeption that will be thrown in case we call EndWaitForConnection on a closed server, we are checking the _isStopping flag, which we set to True when closing the server.
  • All the operations in this class are asynchronous, therefore a lock is used before reading the flag.
  • However, since locking reduces the performance, we check the flag before the lock. We then check it again after the lock since it might have been changed between the previous check and the lock.​
C#
/// <summary>
/// This callback is called when the async WaitForConnection operation is completed,
/// whether a connection was made or not. WaitForConnection can be completed when the server disconnects.
/// </summary>
private void WaitForConnectionCallBack(IAsyncResult result)
{
    if (!_isStopping)
    {
        lock (_lockingObject)
        {
            if (!_isStopping)
            {
                // Call EndWaitForConnection to complete the connection operation
                _pipeServer.EndWaitForConnection(result);

                OnConnected();

                BeginRead(new Info());
            }
        }
    }
}

OnConnected

Fires the event ClientConnectedEvent to any listeners that have subscribed to it.

C#
/// <summary>
/// This method fires ConnectedEvent
/// </summary>
private void OnConnected()
{
    if (ClientConnectedEvent != null)
    {
        ClientConnectedEvent(this, new ClientConnectedEventArgs { ClientId = Id });
    }
}

Info

The Info class holds the message information. A new instance of Info was created in WaitForConnectionCallBack before calling BeginRead

C#
private class Info
{
    public readonly byte[] Buffer;
    public readonly StringBuilder StringBuilder;

    public Info()
    {
        Buffer = new byte[BufferSize];
        StringBuilder = new StringBuilder();
    }
}

BeginRead

BeginRead receives the following parameters:

  • buffer:The buffer to read data into.
  • offset:The byte offset in buffer at which to begin reading.
  • count:The maximum number of bytes to read.
  • callback:The method to call when the asynchronous read operation is completed.
  • state:A user-provided object that distinguishes this particular asynchronous read request from other requests.

The given Info is sent as the state to EndReadCallBack.

C#
/// <summary>
/// This method begins an asynchronous read operation.
/// </summary>
private void BeginRead(Info info)
{
    try
    {
        _pipeServer.BeginRead(info.Buffer, 0, BufferSize, EndReadCallBack, info);
    }
    catch (Exception ex)
    {
        Logger.Error(ex);
        throw;
    }
}

EndReadCallBack

EndReadCallBack is a very interesting method.

It is called when the BeginRead operation completes, which isn't necessarily only when the client writes a message to the pipe, but also when the client has been disconnected or the server has been closed.

  • The call to EndRead returns the number of read bytes.
  • If bytes were read (not zero) they will be extracted from the given info and the info's string builder will append the message.
    • If the mesaage is not complete, another read operation is initiated by calling BeginRead with the same info.
    • If the message is complete, MessageReceivedEvent is fired and a new read operation is initiated with a fresh new info.
  • If no bytes were read, it probably means the client has been disconnected or the server is closed. The ClienticonnectedEvent will be fired and the InternalPipeServer will be stopped.
C#
/// <summary>
/// This callback is called when the BeginRead operation is completed.
/// We can arrive here whether the connection is valid or not
/// </summary>
private void EndReadCallBack(IAsyncResult result)
{
    var readBytes = _pipeServer.EndRead(result);
    if (readBytes > 0)
    {
        var info = (Info)result.AsyncState;

        // Get the read bytes and append them
        info.StringBuilder.Append(Encoding.UTF8.GetString(info.Buffer, 0, readBytes));

        if (!_pipeServer.IsMessageComplete) // Message is not complete, continue reading
        {
            BeginRead(info);
        }
        else // Message is completed
        {
            // Finalize the received string and fire MessageReceivedEvent
            var message = info.StringBuilder.ToString().TrimEnd('\0');

            OnMessageReceived(message);

            // Begin a new reading operation
            BeginRead(new Info());
        }
    }
    else // When no bytes were read, it can mean that the client have been disconnected
    {
        if (!_isStopping)
        {
            lock (_lockingObject)
            {
                if (!_isStopping)
                {
                    OnDisconnected();
                    Stop();
                }
            }
        }
    }
}

OnMessageReceived

C#
/// <summary>
/// This method fires MessageReceivedEvent with the given message
/// </summary>
private void OnMessageReceived(string  message)
{
    if (MessageReceivedEvent != null)
    {
        MessageReceivedEvent(this,
            new MessageReceivedEventArgs
            {
                Message = message
            });
    }
}

OnDisconnected

C#
/// <summary>
/// This method fires DisconnectedEvent 
/// </summary>
private void OnDisconnected()
{
    if (ClientDisconnectedEvent != null)
    {
        ClientDisconnectedEvent(this, new ClientDisconnectedEventArgs { ClientId = Id });
    }
}

Stop

C#
/// <summary>
/// This method disconnects, closes and disposes the server
/// </summary>
public void Stop()
{
    _isStopping = true;

    try
    {
        if (_pipeServer.IsConnected)
        {
            _pipeServer.Disconnect();
        }
    }
    catch (Exception ex)
    {
        Logger.Error(ex);
        throw;
    }
    finally
    {
        _pipeServer.Close();
        _pipeServer.Dispose();
    }
}

PipeClient

PipeClient c'tor

NamedPipeClientStream is created in the PipeClient's constructor. The given serverId is the pipe name to connect to.

C#
public PipeClient(string serverId)
{
    _pipeClient = new NamedPipeClientStream(".", serverId, PipeDirection.InOut, PipeOptions.Asynchronous);
}

Start

Initiates a connection to the server within the given timeout. If not connected within this time, an exception will be thrown.

C#
/// <summary>
/// Starts the client. Connects to the server.
/// </summary>
public void Start()
{
    const int tryConnectTimeout = 5 * 60 * 1000; // 5 minutes
    _pipeClient.Connect(tryConnectTimeout);
}

SendMessage

Once connected, the client can start sending its messages over the pipe.
The SendMessage method returns a task, which will contain the asynchronous result or exception.

  • BeginWrite receives the following arguments:
    • buffer: The buffer that contains the data to write to the current stream.
    • offset: The zero-based byte offset in buffer at which to begin copying bytes to the current stream.
    • count: The maximum number of bytes to write.
    • callback: The method to call when the asynchronous write operation is completed.
    • state: A user-provided object that distinguishes this particular asynchronous write request from other requests.
  • The callback we use is an anonymous function which sets the result of EndWriteCallBack to the taskCompletionSource, or sets an exception to it.
  • The task of taskCompletionSource is the return value of the SendMessage method.
C#
public Task<TaskResult> SendMessage(string message)
{
    var taskCompletionSource = new TaskCompletionSource<TaskResult>();

    if (_pipeClient.IsConnected)
    {
        var buffer = Encoding.UTF8.GetBytes(message);
        _pipeClient.BeginWrite(buffer, 0, buffer.Length, asyncResult =>
        {
            try
            {
                taskCompletionSource.SetResult(EndWriteCallBack(asyncResult));
            }
            catch (Exception ex)
            {
                taskCompletionSource.SetException(ex);
            }

        }, null);
    }
    else
    {
        Logger.Error("Cannot send message, pipe is not connected");
        throw new IOException("pipe is not connected");
    }

    return taskCompletionSource.Task;
}

TaskResult

C#
public class TaskResult
    {
        public bool IsSuccess { get; set; }
        public string ErrorMessage { get; set; }
    }

EndWriteCallBack

EndWriteCallBack calls EndWrite and flushes the pipe. It is important to call EndMethod with the result that its corresponding BeginMethod has returned.

C#
/// <summary>
/// This callback is called when the BeginWrite operation is completed.
/// It can be called whether the connection is valid or not.
/// </summary>
/// <param name="asyncResult"></param>
private TaskResult EndWriteCallBack(IAsyncResult asyncResult)
{
    _pipeClient.EndWrite(asyncResult);
    _pipeClient.Flush();

    return new TaskResult { IsSuccess = true };
}

Stop

C#
/// <summary>
/// Stops the client. Waits for pipe drain, closes and disposes it.
/// </summary>
public void Stop()
{
    try
    {
        _pipeClient.WaitForPipeDrain();
    }
    finally
    {
        _pipeClient.Close();
        _pipeClient.Dispose();
    }
}

PipeServer

PipeServer c'tor

  • A new guid is created and will serve as the pipe name.
  • A synchronization context is defined using AsyncOperationManager.SynchronizationContext which returns the correct synchronization context for all application models supported by the .NET framework. This mean we'll receive the correct context whether we work in WPF, WinForms, etc. More information can be found in this nice article by Gabriel Schenker.
    Synchronization context is used a bit later.  
C#
public PipeServer()
{
    _pipeName = Guid.NewGuid().ToString();
    _synchronizationContext = AsyncOperationManager.SynchronizationContext;
    _servers = new ConcurrentDictionary<string, ICommunicationServer>();
}

Start

C#
public void Start()
{
    StartNamedPipeServer();
}

StartNamedPipeServer

  • A new InternalPipeServer is created with the pipe name and the amount of server instances that we allow, defined in the MaxNumberOfServerInstances field.
  • The created server is stored in a thread safe dictionary. Its id serves as the key.
  • Handlers are registered to the server events.
  • Start is called in order to initiate the server
C#
/// <summary>
/// Starts a new NamedPipeServerStream that waits for connection
/// </summary>
private void StartNamedPipeServer()
{
    var server = new InternalPipeServer(_pipeName, MaxNumberOfServerInstances);
    _servers[server.Id] = server;

    server.ClientConnectedEvent += ClientConnectedHandler;
    server.ClientDisconnectedEvent += ClientDisconnectedHandler;
    server.MessageReceivedEvent += MessageReceivedHandler;
            
    server.Start();
}

ClientConnectedHandler

When an InternalPipeServer fires the ClientConnectedEvent, this handler is called.

  • OnClientConnected lets the world know about the received message.
  • StartNamedPipeServer prepares for a new client connection.
C#
/// <summary>
/// Handles a client connection. Fires the relevant event and prepares for new connection.
/// </summary>
private void ClientConnectedHandler(object sender, ClientConnectedEventArgs eventArgs)
{
    OnClientConnected(eventArgs);

    StartNamedPipeServer(); // Create a additional server as a preparation for new connection
}

OnClientConnected

This method can be called from a working thread. Thus, before firing ClientConnectedEvent, we must synchronize the thread to the UI thread. If we don't do that, the handlers of the PipeServer listeners will be also called in a working thread, and then they will have to perform the synchronization before being able to access any UI control (for example in order to display a message). 
The synchronization is done by calling _synchronizationContext.Post.

C#
    /// <summary>
    /// Fires ClientConnectedEvent in the current thread
    /// </summary>
    /// <param name="eventArgs"></param>
    private void OnClientConnected(ClientConnectedEventArgs eventArgs)
    {
        _synchronizationContext.Post(e => ClientConnectedEvent.SafeInvoke(this, (ClientConnectedEventArgs)e), eventArgs);
    }

SafeInvoke

We use a very nice pattern that is explained in this great answer to a great question

C#
/// <summary>
/// This method is a safe way to fire an event in a multithreaded process. 
/// Since there is a tiny chance that the event becomes null after the null check but before the invocation, 
/// we use this extension where the event is passed as an argument.
/// Why is this helpful? MulticastDelagates are immutable, so if you first assign a variable, null check against the variable and invoke through it, 
/// you are safe
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="event"></param>
/// <param name="sender"></param>
/// <param name="eventArgs"></param>
public static void SafeInvoke<T>(this EventHandler<T> @event, object sender, T eventArgs) where T : EventArgs
{
    if (@event != null)
    {
        @event(sender, eventArgs);
    }
}

MessageReceivedHandler

C#
/// <summary>
/// Handles a message that is received from the client. Fires the relevant event.
/// </summary>
private void MessageReceivedHandler(object sender, MessageReceivedEventArgs eventArgs)
{
    OnMessageReceived(eventArgs);
}

OnMessageReceived

C#
/// <summary>
/// Fires MessageReceivedEvent in the current thread
/// </summary>
/// <param name="eventArgs"></param>
private void OnMessageReceived(MessageReceivedEventArgs eventArgs)
{
    _synchronizationContext.Post(e => MessageReceivedEvent.SafeInvoke(this, (MessageReceivedEventArgs)e), eventArgs);
}

ClientDisconnectedHandler

C#
/// <summary>
/// Hanldes a client disconnection. Fires the relevant event ans removes its server from the pool
/// </summary>
private void ClientDisconnectedHandler(object sender, ClientDisconnectedEventArgs eventArgs)
{
    OnClientDisconnected(eventArgs);

    StopNamedPipeServer(eventArgs.ClientId);
}

OnClientDisconnected

C#
/// <summary>
/// Fires ClientDisconnectedEvent in the current thread
/// </summary>
/// <param name="eventArgs"></param>
private void OnClientDisconnected(ClientDisconnectedEventArgs eventArgs)
{
    _synchronizationContext.Post(e => ClientDisconnectedEvent.SafeInvoke(this, (ClientDisconnectedEventArgs)e), eventArgs);
}

StopNamedPipeServer

C#
/// <summary>
/// Stops the server that belongs to the given id
/// </summary>
/// <param name="id"></param>
private void StopNamedPipeServer(string id)
{
    UnregisterFromServerEvents(_servers[id]);
    _servers[id].Stop();
    _servers.Remove(id);
}

UnregisterFromServerEvents

C#
/// <summary>
/// Unregisters from the given server's events
/// </summary>
/// <param name="server"></param>
private void UnregisterFromServerEvents(ICommunicationServer server)
{
    server.ClientConnectedEvent -= ClientConnectedHandler;
    server.ClientDisconnectedEvent -= ClientDisconnectedHandler;
    server.MessageReceivedEvent -= MessageReceivedHandler;
}

Stop

C#
public void Stop()
{
    foreach (var server in _servers.Values)
    {
        try
        {
            UnregisterFromServerEvents(server);
            server.Stop();
        }
        catch (Exception)
        {
            Logger.Error("Fialed to stop server");
        }
    }

    _servers.Clear();
}

Usage

The attached solution includes unit tests which demonstrate the usage of the library server.

Points of Interest

Working on the client server implementation was very educating. As part of my research I had the opportunity to read some very interesting articles online, which I referenced throughout this article.
Debugging in a multithreaded environment was both challanging and refreshing.

I would like to thank...

I would like to thank my co-workers and outstanding software engineers who helped me with their ideas, code reviews and deep knowledge: Avishay Ben Shabtai, Niv Lederer and Amit Bezalel.
Thank you guys!

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here