Introduction
Named pipe is a great tool for Inter Process Communication (IPC) in Windows. Although this topic has been discussed in many tutorials and forums, I could not find one that is easy to understand and supports multiple named pipe clients simultaneously. In this article, I will start with a simple example that explains the basics (Demo1
). Then, it is extended to classes NamedPipeClient
, NamedPipeServerInstance
and NamedPipeServer
. In the end, the use of these classes is shown in Demo2
for your reference.
Background
First, let's check Demo1.cs
and see how a simple named pipe communication works. While this demo is technically "Inter Thread Communication", it can be easily adapted to real Inter Process Communication scenarios.
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace NamedPipeDemo
{
class Demo1
{
private static string pipeName = "Demo1Pipe";
public static void Run()
{
Task.Run(() => Server());
Task.Delay(300).Wait();
Client();
}
static void Server()
{
using (var server = new NamedPipeServerStream(pipeName))
{
server.WaitForConnection();
var reader = new StreamReader(server);
var writer = new StreamWriter(server);
var received = reader.ReadLine();
Console.WriteLine("Received from client: " + received);
var toSend = "Hello, client.";
writer.WriteLine(toSend);
writer.Flush();
}
}
static void Client()
{
using (var client = new NamedPipeClientStream(pipeName))
{
client.Connect(100);
var writer = new StreamWriter(client);
var request = "Hello, server.";
writer.WriteLine(request);
writer.Flush();
var reader = new StreamReader(client);
var response = reader.ReadLine();
Console.WriteLine("Response from server: " + response);
}
}
}
}
Like many other applications, named pipe uses the client-server model. To make it work, we need to start the server first. In Demo1
, the server is started by Task.Run(() => Server())
and runs in a new thread other than the main thread. If you look inside method Server
, you will find how easy it is to start a new named pipe server. After the server is started, it will wait for a new client connection. The server thread will be blocked until a new client connects to the server. Now that the server is started and actively waiting for incoming client connection, we can start our client and connect it to the server. What happens next is a typical client server communication: the client sends a request to the server -> the server reads the request and sends a response back to the client -> the client reads the response.
Details
In this section, we are going to extend the example above to a reusable library that supports multiple clients. First, let's have a look at NamedPipeClient.cs
. Basically, it wraps method Client
in the previous section and adds some basic error handling.
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace NamedPipeLib
{
public class NamedPipeClient : IDisposable
{
private NamedPipeClientStream client;
private StreamReader reader;
private StreamWriter writer;
public NamedPipeClient(string pipeName) : this(pipeName, 100) { }
public NamedPipeClient(string pipeName, int timeOut)
{
client = new NamedPipeClientStream(pipeName);
client.Connect(timeOut);
reader = new StreamReader(client);
writer = new StreamWriter(client);
}
public void Dispose()
{
writer.Dispose();
reader.Dispose();
client.Dispose();
}
public string SendRequest(string request)
{
if (request != null)
{
try
{
writer.WriteLine(request);
writer.Flush();
return reader.ReadLine();
}
catch (Exception ex)
{
return string.Format("{0}\r\nDetails:\r\n{1}", "Error on server communication.", ex.Message);
}
}
else
{
return "Error. Null request.";
}
}
}
}
Next, let's check PipeMsgEventArgs.cs
. Nothing magic here. It just extends EventArgs
with two properties, Request
and Response
.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace NamedPipeLib
{
public class PipeMsgEventArgs : EventArgs
{
public string Request { get; set; }
public string Response { get; set; }
public PipeMsgEventArgs()
{
}
public PipeMsgEventArgs(string request)
{
this.Request = request;
}
}
}
NamedPipeServerInstance
is worth a detailed explanation. Like the example in the previous section, there is also a NamedPipeServerStream
server. In addition, we have a bool disposeFlag
that records if the server has been disposed or not, a Task TaskCommunication
that handles communication with the client, an EventHandler newServerInstanceEvent
which will be invoked when a client connects to this server, and an EventHandler<PipeMsgEventArgs> newRequestEvent
which is invoked when the client sends a new request to the server.
In the constructor of NamedPipeServerInstance
, we initialize the server with more parameters so that it is asynchronous. Instead of call server.WaitForConnection()
like Demo1
, we call server.BeginWaitForConnection()
. This will not block the thread and make better use of our limited thread resources. When a client connects to our server, the method OnConnected
will be invoked.
The method OnConnected
might be invoked either on new client connection or on server dispose. Thus, it is necessary to check disposeFlag
. If it is new client connection, we will call EndWaitForConnection
, invoke newServerInstanceEvent
, and start our communication with the client.
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace NamedPipeLib
{
class NamedPipeServerInstance : IDisposable
{
private NamedPipeServerStream server;
private bool disposeFlag = false;
public Task TaskCommunication { get; private set; }
public event EventHandler newServerInstanceEvent = delegate { };
public event EventHandler<PipeMsgEventArgs> newRequestEvent = delegate { };
public NamedPipeServerInstance(string pipeName, int maxNumberOfServerInstances)
{
server = new NamedPipeServerStream(pipeName, PipeDirection.InOut, maxNumberOfServerInstances, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
var asyncResult = server.BeginWaitForConnection(OnConnected, null);
}
public void Dispose()
{
disposeFlag = true;
server.Dispose();
}
private void OnConnected(IAsyncResult result)
{
if (!disposeFlag)
{
server.EndWaitForConnection(result);
newServerInstanceEvent.Invoke(this, EventArgs.Empty);
TaskCommunication = Task.Factory.StartNew(Communication);
}
}
private void Communication()
{
using (var reader = new StreamReader(server))
{
while (!reader.EndOfStream)
{
var request = reader.ReadLine();
if (request != null)
{
var msgEventArgs = new PipeMsgEventArgs(request);
newRequestEvent.Invoke(this, msgEventArgs);
var response = msgEventArgs.Response + Environment.NewLine;
var bytes = Encoding.UTF8.GetBytes(response);
server.Write(bytes, 0, bytes.Count());
}
}
}
}
}
}
It's worth noting that a single NamedPipeServerInstance
(more precisely, a single NamedPipeServerStream
) is only capable of handling one client connection. To support multiple client connections, we need multiple NamedPipeServerInstance
s, that's where NamedPipeServer
comes into play.
A NamedPipeServer
creates and disposes NamedPipeServerInstance
s, via methods NewServerInstance
and CleanServers
. The method NewServerInstance
subscribes to newServerInstanceEvent
of NamedPipeServerInstance
. When a new client connects to a NamedPipeServerInstance
, newServerInstanceEvent
will be invoked and will call method NewServerInstance
, which creates a new NamedPipeServerInstance
as long as the number of existing servers doesn't exceed maxNumberOfServerInstances
.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace NamedPipeLib
{
public class NamedPipeServer
{
private readonly string pipeName;
private readonly int maxNumberOfServerInstances;
private List<NamedPipeServerInstance> servers = new List<NamedPipeServerInstance>();
public event EventHandler<PipeMsgEventArgs> newRequestEvent = delegate { };
public NamedPipeServer(string pipeName) : this(pipeName, 20, 4) { }
public NamedPipeServer(string pipeName, int maxNumberOfServerInstances, int initialNumberOfServerInstances)
{
this.pipeName = pipeName;
this.maxNumberOfServerInstances = maxNumberOfServerInstances;
for (int i = 0; i < initialNumberOfServerInstances; i++)
{
NewServerInstance();
}
}
public void Dispose()
{
CleanServers(true);
}
private void NewServerInstance()
{
if (servers.Count < maxNumberOfServerInstances)
{
var server = new NamedPipeServerInstance(pipeName, maxNumberOfServerInstances);
server.newServerInstanceEvent += (s, e) => NewServerInstance();
server.newRequestEvent += (s, e) => newRequestEvent.Invoke(s, e);
servers.Add(server);
}
CleanServers(false);
}
private void CleanServers(bool disposeAll)
{
if (disposeAll)
{
foreach (var server in servers)
{
server.Dispose();
}
}
else
{
for (int i = servers.Count - 1; i >= 0; i--)
{
if (servers[i] == null)
{
servers.RemoveAt(i);
}
else if (servers[i].TaskCommunication != null &&
(servers[i].TaskCommunication.Status == TaskStatus.RanToCompletion ||
servers[i].TaskCommunication.Status == TaskStatus.Canceled ||
servers[i].TaskCommunication.Status == TaskStatus.Faulted))
{
servers[i].Dispose();
servers.RemoveAt(i);
}
}
}
}
}
}
Using the code
Similar to Demo1
, Demo2
also starts with a Task
that runs the server. An inline function, which subscribes to newRequestEvent
of the server, is responsible for handling incoming requests from the clients. The inline function will concatenate "Echo. " and the request as the response. After the server is created, we start 8 clients in parallel. Each client will send three different requests to the server and the responses are logged to the console.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NamedPipeLib;
namespace NamedPipeDemo
{
class Demo2
{
private static string pipeName = "Demo2Pipe";
public static void Run()
{
Task.Run(() => Server());
Task.Delay(300).Wait();
var clients = new List<string>()
{
"Client 1",
"Client 2",
"Client 3",
"Client 4",
"Client 5",
"Client 6",
"Client 7",
"Client 8"
};
Parallel.ForEach(clients, (c) => Client(c));
}
static void Server()
{
var server = new NamedPipeServer(pipeName);
server.newRequestEvent += (s, e) => e.Response = "Echo. " + e.Request;
Task.Delay(10000).Wait();
server.Dispose();
}
static void Client(string clientName)
{
using (var client = new NamedPipeClient(pipeName))
{
var request = clientName + " Request a";
var response = client.SendRequest(request);
Console.WriteLine(response);
Task.Delay(100).Wait();
var request1 = clientName + " Request b";
var response1 = client.SendRequest(request1);
Console.WriteLine(response1);
Task.Delay(100).Wait();
var request2 = clientName + " Request c";
var response2 = client.SendRequest(request2);
Console.WriteLine(response2);
}
}
}
}
Points of Interest
I find it interesting to write a library like this, especially the way that I mange named pipe server instances so that the server can support multiple clients. Hope you enjoy it as well.
History
July 30, 2017. Version 1.0.