Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / desktop / Win32

Implementing an Asynchronous Named Pipe Server - Part 2

4.76/5 (5 votes)
22 Dec 2022MIT20 min read 13.9K   451  
How to implement named pipe server for communicating with client apps
This article completes Part 1 and describes how to implement the IO worker pool for the named pipe server component. There are several possible ways in which this can be implemented. This is just one of them.

Introduction

In the previous article, I described how a named pipe server can be built in a way in which all connection handling is done asynchronously, in a dedicated thread. In this article, I describe how IO can be performed in a pool of IO worker threads.

Background

In the previous article, I explained the basics of named pipes, as well as Windows Events and Wait operations. Not surprisingly, that same information is relevant here as well. In particular, the use of Auto Reset events.

As I mentioned, there are several ways to perform asynchronous IO. Windows comes with ThreadPool APIs, support for completion ports, and C++ parallelization support, etc. While all of these are perfectly valid approaches, I decided to take a slightly different approach. In the previous article, I used OVERLAPPED structures and Events for the asynchronous parts and I wanted to reuse those concepts.

My code will have a pool of threads which all do the exact same thing: wait for an event that is associated with the IO Read operation that is executed on every active client connection. The key to this concept is that all active threads are waiting for all IO events simultaneously. This seems counter intuitive at first. However, an Auto Reset event will satisfy exactly 1 wait operation, leaving the other threads free to receive other events. Even if 1 thread is triggered by an IO completion, the wait operations involving the same event handle in different threads are unaffected and may be satisfied by a future IO completion.

Conceptually, there is a maximum of X concurrent client connections, and a threadpool that can act on Y messages concurrently.

The IO Buffer

When performing pipe IO, we have to start an IO Read operation using ReadFile. Unfortunately, we don't know ahead of time how large the messages will be. The buffer sizes supplied to the pipe have to do only with low level reservations and are no limit or guarantee about the message length a client will send, or which the server needs to send back.

This means that while we can associate a buffer with a HANDLE for the purpose of executing ReadFile, there is no guarantee that this is enough. There is a real chance that the buffer needs to be extended at some point. Additionally, when we start a possible follow-up IO request to read the remainder of the message, the data has to be filled in right where the previous operation ended. So we need a write pointer / position tracking. Luckily, I already have an buffer class CBuffer<T> which was covered in an earlier article and which implements dynamic memory allocation.

Starting from that, we can build CIOBuffer with minimal effort.

C++
CIOBuffer::CIOBuffer(DWORD defaultSize) :
    CBuffer<BYTE>(defaultSize),
    m_DefaultSize(defaultSize),
    m_WriteOffset(0) { }

CIOBuffer::CIOBuffer() :
    CBuffer<BYTE>(0),
    m_DefaultSize(0),
    m_WriteOffset(0) { }

void CIOBuffer::Reset() {
    Resize(m_DefaultSize);
    m_WriteOffset = 0;
}

void CIOBuffer::Reset(DWORD defaultSize) {
    m_DefaultSize = defaultSize;
    Reset();
}

void CIOBuffer::AddSize(DWORD additionalSize) {
    Resize(Size() + additionalSize);
}

DWORD CIOBuffer::Offset() {
    return m_WriteOffset;
}

void CIOBuffer::SetOffset(DWORD newOffset) {
    m_WriteOffset = newOffset;
}

void* CIOBuffer::WritePtr() {
    return (BYTE*) m_buffer + m_WriteOffset;
}

DWORD CIOBuffer::SizeRemaining() {
    return Size() - m_WriteOffset;
}

void* CIOBuffer::Ptr() {
    return m_buffer;
}

DWORD CIOBuffer::Size() {
    return CBuffer<BYTE>::Size();
}

As you can see, it is basically a very thin wrapper around a piece of dynamic memory with an additional variable to track a write pointer and the size that remains available in the buffer. This also provides a way for the data consumer to know how much data was actually received.

The Client IO Context

Every connected client has a set of information associated with it: a pipe handle, an OVERLAPPED structure, an Event, ... so it makes a lot of sense to combine those things in a class because there is a 1-1 relationship.

C++
class CIOContext
{
public:
    CIOContext();
    CHandle IOHandle;            //The handle for the object on which the IO is done
    CAutoResetEvent IOEvent;     //completion event for the object
    OVERLAPPED Overlapped;       //overlapped completion structure
    CIOBuffer InputBuffer;       //buffer for input IO
    CIOBuffer OutputBuffer;      //buffer for output IO
};

There is no code associated with this class. It acts as a glorified struct. The only code is in the constructor which initializes the members with a default constructor. We can do this because all members can be initialized without needing names.

The CNamedPipeWorkerPool class

A little bit of context is needed before showing the code.

As I mentioned, this implementation uses OVERLAPPED events to receive notifications. All threads wait for all events, which means that when we use WaitForMultipleObjects, we are limited by the maximum number of concurrent wait objects. That number is 64. Additionally, 1 event has to be reserved for the internal shutdown event, which is used in a similar fashion to the shutdown in the connection handler. So the maximum number of concurrent clients is 63.

The pool has the following internal state information (methods removed for clarity):

C++
class CNamedPipeWorkerPool
{

public:
    static const DWORD MAX_INSTANCES = MAXIMUM_WAIT_OBJECTS - 1;

private:
    static const DWORD NUM_WAIT_OBJECTS = MAXIMUM_WAIT_OBJECTS;
    static const DWORD MAX_THREADS = 16;
    static const DWORD SHUTDOWN_EVT_INDEX = MAX_INSTANCES;

    CHandle m_ServerThread[MAX_THREADS];
    CIOContext m_IOContext[MAX_INSTANCES];
    CManualResetEvent m_ShutdownEvent;

    HANDLE m_WaitObjects[NUM_WAIT_OBJECTS];

    PipeWorkerMessageHandler m_MessageHandler;
    void* m_Context;

    DWORD m_NumThreads;
}

Whenever possible, I avoid dynamic sizing of arrays, simply because in order to gain a couple of bytes memory optimization, we need to add programmatic complexity (range checking, error handling, ...) which not only costs time but also increases executable size so it's not really gaining much. Especially since we're talking about just a few hundred bytes.

The value for MAX_INSTANCES is one less than the maximum number of wait objects because every instance needs 1 event associated with it, and 1 additional shutdown event is reserved. The max number of wait objects is also a constant NUM_WAIT_OBJECTS. I made that decision because if I change the mechanism for asynchronous operations, the MAXIMUM_WAIT_OBJECTS macro may not be appropriate anymore.

There are various ways to set a maximum number of threads. A sensible one is that the number of threads can never be bigger than the number of CPU cores that are available. However, that number cannot be known at compile time. I could have gone for 63 (same as the max number of connections) but that is also a nonsensical number. 16 seems a sensible maximum. If need be, it can easily be increased later.

There is an IO Context for every possible connection. There is a separate array which contains raw handles because that's what WaitForMultipleObjects expects.

Now, let's take a look at the constructor:

C++
CNamedPipeWorkerPool::CNamedPipeWorkerPool(
        DWORD numWorkerThreads,
        PipeWorkerMessageHandler messageHandler,
        void* context,
        DWORD nInBufferSize,
        DWORD nOutBufferSize) :
        m_ShutdownEvent(),
        m_MessageHandler(messageHandler),
        m_NumThreads(numWorkerThreads),
        m_Context(context),
        m_IOContext()

    {
        m_WaitObjects[SHUTDOWN_EVT_INDEX] = m_ShutdownEvent;

        for (DWORD i = 0; i < MAX_INSTANCES; i++)
        {
            m_WaitObjects[i] = m_IOContext[i].IOEvent;
            m_IOContext[i].InputBuffer.Reset(nInBufferSize);
            m_IOContext[i].OutputBuffer.Reset(nInBufferSize);
        }

        try
        {
            for (DWORD i = 0; i < m_NumThreads; i++) {
                m_ServerThread[i] = 
                  CreateThread(NULL, 0, WorkerThreadFunc, this, 0, NULL);
                if (m_ServerThread == NULL) {
                    throw ExWin32Error
                    (L"CNamedPipeWorkerPool::CNamedPipeWorkerPool CreateThread");
                }
            }

        }
        catch (...)
        {
            for (DWORD i = 0; i < MAX_THREADS; i++) {
                if (m_ServerThread[i].IsValid())
#pragma warning( push)
#pragma warning( disable : 6258) 
                    TerminateThread(m_ServerThread[i], -1);
#pragma warning (pop)
            }
            throw;
        }
    }

The IO context values are all initialized in the initializer list, after which we use those values to manually initialize the array of wait objects. It doesn't matter that the HANDLE values are unmanaged copies of the managed ones. They'll be cleaned up when the object is destroyed, and at that point, no one is using them anymore.

Initializing the threads is a little more involved. We need to create a certain number of threads, which is something that can fail even if it is unlikely. Say we need to create 8 threads and at thread 5, there is an error. It is vital that those threads get terminated. However, strictly speaking, we cannot do that using anything tied to the CNamedPipeWorkerPool object. We're still in the constructor. If we throw here, the destructor will not be executed. This means that those threads might still exist, and touch the CNamedPipeWorkerPool object after it is deleted.

This means that any action we want to take, we need to take it right there. Conceivably, we could trigger the shutdown event and wait for the living threads to finish and wait on their handles. However, in this particular case, we can take a simpler approach. We know that CNamedPipeWorkerPool is still being constructed. It is guaranteed that those threads are not yet doing anything that would require cleanup. In this particular scenario, it is safe to use TerminateThread to simply remove them from existence.

TerminateThread is not always safe to use. Threads that are terminated like this do not perform cleanup, do not unroll the stack in a RAII compatible manner. Because of that, it causes a compiler warning. I explicitly disable the compiler warning for that particular line of code to indicate that I'm aware of the compiler warning and ignoring it is a conscious decision. Just ignoring that warning is generally a bad idea because anyone else reading the code will be left wondering about it.

After the threads are created, the object is ready to start serving IO and receiving client requests.

Adding Connections

I mentioned before that I wanted to implement this class without using locks or other synchronization mechanisms. At first, that seems like a big ask, given that we're talking about something that gets used and updated in several threads at the same time. However, the following things are true:

  1. Connections get added only in 1 thread, which is the connection handler thread.
  2. Connections get removed only in the IO worker pool.
  3. An IO completion event (whether successful or because of failure) triggers only 1 IO thread into action.
  4. The arrays holding all information are fixed length and do not need to be resized at runtime.

So let's look at adding connections.

C++
void CNamedPipeWorkerPool::AddClientConnection(CHandle connection)
{
    CIOContext* ioContext = NULL;
    for (DWORD i = 0; i < MAX_INSTANCES; i++)
    {
        if (!m_IOContext[i].IOHandle.IsValid()) {
            ioContext = &m_IOContext[i];
            break;
        }
    }

    if (ioContext == NULL)
        throw AppException
        ("CNamedPipeWorker::AddClientConnection no more empty slots");

    ioContext->IOHandle = connection;
    try
    {
        InitiateRead(connection, ioContext->InputBuffer, &ioContext->Overlapped);
    }
    catch (exception& ex)
    {
        DisconnectNamedPipe(ioContext->IOHandle);
        ioContext->IOHandle = NULL;
        throw;
    }
}

The first order of business is to find an IO context that is available. Remember that the OVERLAPPED structures, events and buffers are all pre-allocated. We just need to find a set that doesn't have a connection handle associated with it. There are as many slots as there are possible connections, so a connected handle means that there has to be a slot available. However, we have a safety catch in case there isn't.

We assign the handle to that slot first, and then initiate the asynchronous read operation to await a message. Should that fail for whatever reason, we disconnect the client and close the handle.

The IO Handling Thread

The IO threads themselves are relatively simple. They all do the same thing. For the sake of clarity, I've cut this function in pieces. The thread is a continuous loop which only reacts when a Windows event has happened.

C++
DWORD CNamedPipeWorkerPool::WorkerThreadFunc(void* ptr)
{
    CNamedPipeWorkerPool* worker = (CNamedPipeWorkerPool*)ptr;
    while (!worker->IsShuttingDown())
    {
        DWORD eventIndex = WaitForMultipleObjects
        (NUM_WAIT_OBJECTS, worker->m_WaitObjects, FALSE, INFINITE);

        //If a shutdown was signalled, the thread must end
        if (eventIndex == SHUTDOWN_EVT_INDEX)
            return 0;

        //return value if an event was triggered is the index of the event.
        //If the return value is not a valid index,
        //something is catastrophically wrong and we exit.
        if (eventIndex >= MAX_INSTANCES) {
            DWORD error = GetLastError();
            return -1;
        }

        CIOContext* ioContext = &worker->m_IOContext[eventIndex];

WaitForMultipleObjects finishes only if an IO event completes or a shutdown is triggered. Its return value is the index of the event which satisfied the wait. When it is time to shutdown, the threads themselves do nothing in terms of cleanup because they would try to do the same thing, which would require a locking mechanism to implement safely.

It's also obvious now why the shutdown event is last in the array of wait events. This way, the index in m_WaitObjects is a 1-1 match with the corresponding index in the m_IOContext array. It's not that it would be big problem to recalculate but avoiding the potential for 'off by 1' errors is always a good idea. Note that this is also why the while loop is using IsShuttingDown for its loop variable. If a large amount of message is swamping the IO threads, they would keep processing IO completion events and only process the shutdown event when the IO stops.

Once we have the IOContext* corresponding with the event, we can get to the meat of the communication.

C++
        try
        {
            if (!ioContext->IOHandle.IsValid())
                continue;

            worker->ProcessReadCompletion(
                ioContext->IOHandle,
                ioContext->InputBuffer,
                &ioContext->Overlapped);

            worker->m_MessageHandler(
                worker->m_Context,
                ioContext->IOHandle,
                ioContext->InputBuffer,
                ioContext->OutputBuffer );

            if (GetCurrentThreadToken() != NULL) {
                if (!RevertToSelf()) {
                    SetEvent(worker->m_ShutdownEvent);
                    return GetLastError();
                }
            }

            if (ioContext->OutputBuffer.Offset() > 0) {
                worker->SendReturnMessage(
                    ioContext->IOHandle,
                    ioContext->OutputBuffer);
            }

            worker->InitiateRead(
                ioContext->IOHandle,
                ioContext->InputBuffer,
                &ioContext->Overlapped);
        }
        catch (exception& ex)
        {
            //will cause the handle to be closed
            ioContext->IOHandle = NULL;
        }
    }

    return 0;
}

First, we check if the handle is valid. It always should be. I can see no realistic reason why it wouldn't be but protecting against an invalid handle is never a bad idea. The next step is to complete the IO (more on that below).

With the incoming message complete, we can finally invoke the message handler that was supplied by the application. This is the heart of what we are trying to implement: This is where the application has said "Whenever incoming message is received, execute this function on the data". The context parameter is the information that the application wants to send into the message handler (similar to the argument supplied to a newly created thread). The handle is supplied because the message handler may require connection information such as the identity of the client. And finally, the input and output buffer are for receiving data into the message handler, and preparing data for sending back.

The next part may seem weird but it's essential. Named pipes are one of the IPC mechanisms that support impersonation. In other words, the message handler may call an API to switch the thread to the identity of the caller in order to perform certain actions such as a database update, for example. And when it is done, it should undo the impersonation. However, if it doesn't, then the thread would remain running as that identity, which is a bad thing. To make sure that doesn't happen, we check if impersonation is still active (the current thread token is not NULL) and revert to our own identity if it does. The documentation says that if RevertToSelf fails, that is a critical error and the process should end. That is a bit drastic, but it is a good idea to shut down all IO handling. More elaborate error handling can be implemented if need be.

Assuming we get this far, we send a response message if the message handler has composed one in the buffer, and finally we initiate a new IO Read request which may or may not be satisfied immediately. If a message is already there and data is read, the IO processing may already have started on another thread by the time this thread goes back to waiting.

If at any point in this set of activities there is an error, an exception is thrown. The exception signifies an issue with the IO connection that is being dealt with, not with the thread. So the correct course of action is to disconnect the client and close the connection. As it happens, all both actions are achieved simply by closing the handle.

Initiating the Read Request

So what goes into a read request? We already discussed the fact that the IO is performed asynchronous: we wait for it. However, here too, we have to anticipate race conditions. Even though the time between connecting a client and initiating a read request is miniscule, there is a non-zero chance that the client has sent a message that's already available and waiting for us.

If that happens, ReadFile is a success. The data will be retrieved synchronously. This is not a problem. The overlapped event will still be triggered, and any data handling will be taken care of in the thread pool. where the wait will fall through. When this happens, the read operation may be completed in a different thread than the one where the read request originated. That is why we cannot do anything here.

C++
void CNamedPipeWorkerPool::InitiateRead
     (HANDLE pipe, CIOBuffer& buffer, LPOVERLAPPED overlapped)
{
    DWORD bytesRead = 0;
    buffer.Reset();
    if (ReadFile(pipe, buffer.WritePtr(), buffer.SizeRemaining(),
                 &bytesRead, overlapped))
    {
        ; //Data already available. Event will be triggered
    }
    else {
        if (GetLastError() == ERROR_IO_PENDING) {
            //There was no data waiting for the read operation. This is normal
        }
        else if (GetLastError() == ERROR_MORE_DATA) {
            //Data arrived synchronous but not all data was received.
        }
        else {
            throw ExWin32Error(L"Read named pipe error");
        }
    }
}

If the data was not yet available, there are 2 supported error modes that are considered normal. The first is if there was no data waiting for the read operation. This is normal. When data arrives, it will trigger the overlapped event and the data will be read in the thread pool by one of the worker threads. The second is if data was already available, but the full message was not read (for example, because the read buffer wasn't big enough. This just means that when the worker is processing the overlapped event, it will detect that it needs to retrieve more data.

Any other error is considered catastrophic.

Completing a Read Request

When the ReadFile operation completes, there are three possibilities.

C++
void CNamedPipeWorkerPool::ProcessReadCompletion
     (HANDLE pipe, CIOBuffer & ioBuffer, LPOVERLAPPED overlapped)
{
    DWORD bytesRead = 0;
    if (GetOverlappedResult(pipe, overlapped, &bytesRead, FALSE))
    {
        ioBuffer.SetOffset(ioBuffer.Offset() + bytesRead);
    }
    else
    {
        //Failure may indicate that there is more data to read.
        //The pipe can tell us how much data
        //is waiting in the message that is currently being read.
        if (GetLastError() == ERROR_MORE_DATA) {
            ioBuffer.SetOffset(ioBuffer.Offset() + bytesRead);

            DWORD bytesRemainingInMessage = 0;
            if (!PeekNamedPipe(pipe, NULL, 0, NULL, NULL, &bytesRemainingInMessage))
            {
                throw ExWin32Error
                  ("PeekNamedPipe failed after GetLastError() == ERROR_MORE_DATA");
            }

            bytesRead = 0;
            ioBuffer.AddSize(bytesRemainingInMessage);
            if (!ReadFile(pipe, ioBuffer.WritePtr(), ioBuffer.SizeRemaining(),
                 &bytesRead, NULL)) {
                throw ExWin32Error("ReadFile failed for remaining bytes in pipe");
            }
        }
        else {
            throw ExWin32Error("Overlapped ReadFile failed.");
        }
    }
}

The first possibility is if the operation completes successfully. In that case, the data is already in the buffer and the only thing we need to do is update the offset of the write pointer so that the code processing the data knows exactly how much data was received.

If there was an error, this could mean that the buffer size was simply too small for the total message. However, because we're using a message based pipe, we have the guarantee that the complete message is available. We can use PeekNamedPipe to find out how many bytes there are left in the message, and then grow the buffer and perform a synchronous read to get the remainder of the message. Because we're not doing an overlapped read, there will not be an IO event and instead, we just get the data there and then. A wait would be pointless because we already know the data is there.

If there is any other error, it simply means that the IO has failed for any number of reasons that could indicate something benign (the client has closed the connection) or catastrophic. Whatever the case may be, it means that the connection is no longer considered valid and we throw an exception to let the caller deal with it.

Sending a Response Message

Sending the response message is trivial but for the sake of completeness, I'll cover it.

C++
void CNamedPipeWorkerPool::SendReturnMessage(HANDLE pipe, CIOBuffer& buffer)
{
    if (!WriteFile(pipe, buffer.Ptr(), buffer.Offset(), NULL, NULL)) {
        throw ExWin32Error("CNamedPipeWorker::SendReturnMessage WriteFile");
    }
}

This operation is also processed synchronously. There is no real optimization to be had by doing this asynchronously.

Multithreading Considerations

There are a couple of possible scenarios for thread races which need to be considered.

Why is There Always a Free Slot?

Why do I say there should always be a free spot? Isn't it possible for a client to disconnect, and a new client to connect before the completion event for the previous connection has fired and the cleanup is finished? As it turns out, no. Even when a client disconnects, that doesn't close the server end of the pipe that is represented by our handle. And it's the number of server endpoints that is limited. A disconnected server endpoint is still a named pipe instance. It is only really closed if the last handle to it is closed.

So consider what happens if there are 63 connections in use and one of them simply disconnects. From the named pipe's point of view, there are still 63 connections and a new one will not be created until the cleanup for the disconnected client is performed and the handle is closed. Since closing the handle is the very last thing that is done, implicitly this means that if we're looking for a free slot, it is because the named pipe was allowed to open another instance, which means we are certain that there is a free slot for it.

What Happens if We Look for a Free Slot While Another Thread Is Cleaning Up?

And what happens if in the meantime, 2 more threads experience a client disconnect and are closing in those other threads while we are looking for a free slot? The answer again is: nothing. since we are looking for a slot with an invalid handle. Another thread can only be closing a handle that is valid so it will not interfere. Even if the check should be performed at the very same time when the handle is being closed: the code for closing the handle is this:

C++
void CHandle::CloseHandle() {
    if (IsValid()) {
        ::CloseHandle(m_handle);
        m_handle = INVALID_HANDLE_VALUE;
    }
}

The internal variable is set invalid after the handle is fully closed, which means that while it is theoretically possible for a handle to be considered 'in use' when it is about the be freed up, the alternative is never a possibility, because there is only 1 thread ever looking for a free slot, and only 1 thread accessing any particular handle for cleanup.

What Happens if there are Outstanding IO Requests When Shutdown is Triggered?

When we initiate the shutdown, there can be many IO requests outstanding, and several threads may be handling messages. As tempting as it is to close them all there and then, we cannot do that. The entire premise of being able to safely add and remove connections in a thread safe manner without locks is that our design guarantees that only 1 thread is modifying a connection slot at any given time. Doing any of that from a different thread violates this principle.

In order to make it safe, we'd need to implement locking, which is something I didn't want. Instead, we expose a method to terminate all connections. This method is called by the connection handler thread when all IO worker threads have ended. In that case, we are guaranteed that no other threads are touching those slots, and it is safe to close all connections.

Note that technically, we don't even need to do this. Technically, those handles would all be closed when the IO worker pool object is destroyed and all CHandle instances are destroyed. However, there is the matter of the possible outstanding IO requests. Every ReadFile operation has a buffer associated with it. Suppose the buffers of an IO context get deallocated before the handle is closed, and in the microsecond between these two actions, a message arrives. If that happens, there could be a crash because the read operation is writing the message into a buffer that doesn't exist anymore.

To prevent this from happening, the CIOContext class has a destructor that closes the handle. This guarantees that the previous scenario does not come to pass.

We still disconnect all clients and cancel all IO when the IO threads are shut down. Not because this is needed to avoid the problems described above, but because if the named pipe server is part of a large and complex application, there may be a significant time between the shutdown of the IO, and the destructor call that would close the connections. It is cleaner for the clients to be disconnected when our IO is shutdown.

What Happens if IO is Coming in While a Previous IO Request is Being Handled?

Suppose a thread is triggered by an IO completion and while we are handling it, the client sends another request. What happens then? Nothing happens. For an IO event to be set, there needs to be an IO Read request in progress. Otherwise, the incoming message is simple buffered by the Windows IO subsystem until we initiate a read request to deal with it.

Processing Messages Concurrently

One thing that deserves some more attention is the handling of messages themselves. Consider, for example, a SQL server which receives queries through named pipes. Our named pipe implementation is perfectly safe for multithreaded use. What we cannot do is make the message handling thread safe. Actually, technically we could, by wrapping a mutex around each invocation of the message handler and stall the IO. But that approach is horrible from a performance point of view.

This means that the overall application providing the message handler function pointer needs to ensure that the message handler implements the appropriate locking mechanism to make sure that when 2 or more messages are processed at the same time (there are 2 or more clients sending messages) the locking is correct, and fine grained enough to not hamper performance more than absolutely necessary.

Using the CNamedPipeServer in a Real Application

Now that we've covered everything, it's time to show how to use the component in a real pipe server. For testing purposes, we have a message handler which receives a message, prints it to the console, and compiles a return message.

As explained in the previous section, we have to look at what kind of locking is needed. In this case, the only section of code that is sensitive to thread races is the printing of the message. If we don't do anything, multiple messages will be garbling each other. For that purpose, the application passes a pointer to a mutex for this purpose.

The the application is something complex like a SQL server for example, then instead of a simple mutex, it is probably a better idea to provide a complete statemachine, handling all possible scenarios. This is a topic outside the scope of this article.

C++
void pipeWorkerMessageHandler(
    void* context,
    w32::CHandle& handle,
    CIOBuffer& input,
    CIOBuffer& output)
{
    std::mutex* pmutex = (std::mutex*) context;
    DWORD threadId = GetCurrentThreadId();
    Sleep(1000); //simulate work
    LPWSTR text = (LPWSTR)input.Ptr();
    {
        lock_guard< std::mutex> lock(*pmutex);
        std::wcout << L"Thread " << threadId << L" relieved: " << text << std::endl;
    }

    wstringstream stream;
    stream << L"Message answered by thread " << threadId;
    wstring message = stream.str();
    DWORD bytes = (message.size() + 1 ) * sizeof(wstring::value_type);
    memcpy(
        output.Ptr(),
        (void*)message.c_str(),
        bytes);
    output.SetOffset(bytes);
}

The Server Application

Our test server is very simple. It creates an instance of the pipe server and assigns a message handler function, with a mutex pointer for context. At that point, it only waits for the user to start the connection handling, or the shutdown command.

C++
try
{
    std::mutex g_mutex;
    wstring name(L"\\\\.\\Pipe\\NamedPipe");

    CNamedPipeServer server(name,
        pipeWorkerMessageHandler,
        &g_mutex,
        256,
        256,
        10);

    cout << "Make a choice:" << endl;
    cout << "==============" << endl;
    cout << "s: start serving" << endl;
    cout << "q: quit" << endl << endl;
    cout << "Choice: ";

    char choice = 0;
    do
    {
        choice = getchar();
        if (choice == 'q') {
            cout << "Initiating shutdown" << endl;
            server.Shutdown();
            cout << "Waiting until everything is shutdown" << endl;
            server.WaitUntilFinished(INFINITE);
            cout << "Shutdown finished" << endl;
        }
        else if (choice == 's') {
            cout << "Starting server connections" << endl;
            server.StartServing();
        }
    } while (choice != 'q');

}
catch (exception& ex)
{
    cout << ex.what() << endl;
}

The Client Application

The client application is similarly simple. It opens a named pipe, and switches the pipe connection to message based, as was explained in Part 1 of this article.

C++
try
{
    wstring name(L"\\\\.\\Pipe\\NamedPipe");
    w32::CHandle pipeHandle = CreateFileW(name.c_str(),
        GENERIC_READ | GENERIC_WRITE, 0, NULL,
                       OPEN_ALWAYS, FILE_FLAG_OVERLAPPED, NULL);
    if (!pipeHandle.IsValid())
        throw ExWin32Error(L"CreateFileW "+ name);

    DWORD dwMode = PIPE_READMODE_MESSAGE;
    if(!SetNamedPipeHandleState(pipeHandle, &dwMode, NULL, NULL))
        throw ExWin32Error(L"SetNamedPipeHandleState " + name);

    w32::CHandle readerThread =
         CreateThread(NULL, 0, ReaderLoop, pipeHandle, 0, NULL);
    cout << "Make a choice:" << endl;
    cout << "==============" << endl;
    cout << "q: quit" << endl;
    cout << "1: send 1 message" << endl;
    cout << "5: send 5 messages" << endl;
    cout << "50: send 50 messages" << endl;
    cout << "Choice: ";

    string  choice;
    do
    {

        getline(cin, choice);
        if (choice == "q") {
            shutdowninitiated = true;
            pipeHandle.CloseHandle();
            WaitForSingleObject(readerThread, INFINITE);

        }
        else if (choice == "1") {
            SendPipeMessage(pipeHandle);
        }
        else if (choice == "5") {
            for (int i = 0; i < 5; i++) {
                SendPipeMessage(pipeHandle);
            }

        }
        else if (choice == "50") {
            for (int i = 0; i < 50; i++) {
                SendPipeMessage(pipeHandle);
            }

        }
    } while (choice != "q");

}
catch (exception& ex)
{
    cout << ex.what() << endl;
}

The interface allows you to send 1 message, 5 messages or 50 messages. You can run multiple instances of the client application side by side, to observe the behavior of the message handling of concurrent messages.

Testing the Applications

Finally, it's time to do some testing. For this test, I started 1 pipe server and 3 client applications, all sending messages concurrently.

Image 1

As you can see, everything does what it is supposed to do. The source code is included so you can modify the test applications if you want to make them more interesting and have them do other things.

Conclusion

Named pipes are incredibly useful and flexible technology. Integrating them properly with your server can be tricky but with this server implementation I've taken most of the sting out of it. The code is MIT licensed so if you want to reuse it, have fun with it.

This article took a lot of time writing and covers a lot of details. I hope I didn't skip over anything important or made a mistake somewhere. If you think I messed up something or something isn't clear, post a comment and I'll check it out.

History

  • 22nd December, 2022: First version of the article with working test applications and source code

License

This article, along with any associated source code and files, is licensed under The MIT License