Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / desktop / Win32

Asynchronous I/O with Thread.BindHandle

4.25/5 (7 votes)
7 Jan 2013Ms-PL17 min read 34K   501  
Describes the usage of Asynchronous I/O with I/O Completion ports with .NET

Introduction

High performance I/O is a challenge usually left to C and C++ programmers for Windows Developers. The .NET framework also provides for high performance I/O in the form of Thread Pools bound to I/O completion ports. Documentation exists, but is sparse to find on the Internet as well from the MSDN API catalogue. This article explains the usage of I/O completion ports from within .NET to allow you to implement high performance I/O. 

The final product is a class that is used to start a process and to communicate with that process using redirection for StdIn, StdOut and StdErr. The class in itself is useful and can be used without understanding anything in this article. The goal is to describe the mechanisms behind the IPC that allow for high performance I/O. Similar concepts can be applied to all overlapped I/O.

Background

The reader is expected to be familiar with Streams for .NET and their usage for asynchronous I/O (a particularly good example is the FileStream class); usage of WinAPI within .NET via the P/Invoke mechanisms and a little about thread pools in .NET.

Using the code

The class provides the following benefits

  • Unlimited buffering for StdIn, StdOut and StdErr from a process. Windows typically has a buffer size of only a few kilobytes, that can cause processes to block when writing to their Std* handles.
  • Buffering is paged, to allow for flexible memory management within the .NET GC

Starting a Process

The class should be as simple to use as possible. To start a new process which your main process can then monitor, use the following code:

C#
Process p = Process.Execute("notepad.exe", false);

A new object Process is returned, which can be used to terminate the process, wait for it to end and monitor redirected handles. The second parameter allows you to wait for the process to end before continuing.

If you run a process as a "helper", a small process that does a specific job and then exits, you should using the C# using clause to ensure error free behaviour (and that underlying handles are freed):

C#
using (Process p = Process.Execute("cscript /nologo .\\script.vbs", true) {
  Console.WriteLine("Exit Code {0}", p.ExitCode);
}

The second parameter in this case is true, indicating that control continues only when the script has ended. The object remains valid, allowing the exit code to be obtained. Any output from the process can be parsed with the p.StdOut or p.StdErr properties, usually via the ReadLine() methods that they present. As the data is captured immediately on output independent of your thread, the information can still be parsed after the program has ended.

Terminating a Process

The class only supports terminating a process which itself has started. While technically possible (see the unit test case Process_CreateNotepadDispose in the source provided) it should be an uncommon feature to want to end a process which wasn't programmatically started by the main program.

C#
using (Process p = Process.Execute("notepad.exe", false) {
  // Wait one second for the program to end.
  bool result = p.Wait(1000);
  if (!result) {
    p.Terminate(-1);
  }
  Console.WriteLine("Notepad Exit Code {0}", p.ExitCode);
}

This starts the notepad.exe process and waits one second for it to end (usually because the user closes the program manually). If it does not end within this time, the process is explicitly terminated with the exit code of -1 (so that the method ExitCode should normally return -1).

The exit code can be obtained by the property ExitCode. This value is only valid if the process has ended, which is true when:

  • Wait() returns;
  • Wait(int timeout) returns true;
  • Terminate(int exitcode) is given.

Asynchronous I/O

This is where the most interesting part of the article begins, the mechanisms used to retrieve output StdOut and

StdErr
from the process, as well as feeding the process with input if required. The techniques here have a practical application to the implementation of the Process class, but are not limited to such a simple example. While this article concentrates on receiving StdOut (and StdErr), the concepts are the same for StdIn.

Communication from the process to the Process class is done using pipes. Three pipes are created, one for StdOut, StdErr and StdIn. The write ends of the pipes are passed to the subprocess as part of the StartInfo object given in CreateProcess() for StdOut and StdErr, the read end of the third pipe is passed for StdIn.

Reading I/O Patterns

The first task is to choose how to get the information from the stream. There are three convenient mechanisms:

  1. A single thread with blocking ReadFile() calls per file handle, implementing a typical producer/consumer patter
  2. A single thread for all file handles, using overlapped I/O and Events
  3. I/O completion ports bound to a ThreadPool with asynchronous callbacks

These three mechanisms are covered lightly, with the last given the most detail. The actual implementation is done in the Process.AsyncConsoleReader class. This class is so designed that it's easy to implement patterns 1 and 3.

Blocking I/O

This is arguably the simplest way of implementing I/O. One thread (the producer) is responsible for reading the data from the pipe and writing that data to a queue. A second thread (the consumer) which is usually the main thread reads data from the queue. There is one producer thread per file handle, resulting in a total of three producer threads plus the main thread.

It is important to note that for the implementation of the Process class, this could be considered a valid pattern. The Windows method CreatePipe() doesn't support flags, therefore doesn't support FILE_FLAG_OVERLAPPED.

Advantages:

  • This mechanism is simple to understand;
  • It's portable across multiple operating systems with minimal effort.
  • Easy to add extra file handles by creating a new threads/instances of the object

Disadvantages:

  • Threads require resources for usage. While we only have three threads and this is not a problem, scaling is not practical.
  • There is a high number of context switches. For x86 processors on Windows, this is less a concern, but other architectures are not so efficient (and may be more relevant with ARM based architectures)

Overlapped I/O with Events

The disadvantages of Blocking I/O occur due to multiple instances of threads running for each active file handle. Performance suffers through the time required for context switches and for a large number of threads, this can lead to so called "thread thrashing". Even if a thread is ready to run, it might not be able to run. Secondly, there is no policy to specify which thread should run which could theoretically leave to starvation of I/O for particular file handles, as the Windows kernel scheduler is designed for CPU performance and not file performance and is non-deterministic.

To overcome this, one can implement a single thread to handle all file handles. Certainly, there would be a performance improvement as there is less context switching, if a file handle is ready to be serviced it can utilise the CPU immediately. By ordering how to test for handles it is possible to implement a deterministic behaviour independent of the scheduler (insofar that the scheduler allows the I/O thread to run).

An example of such a threading mechanism is provided in the open source project SerialPortStream at http://serialportstream.codeplex.com, also written by the author of this article. Refer to the implementation in NativeSerialPort_CommOverlappedIo.cs.

Advantages over Blocked I/O:

  • Reduced resource overhead by having a single thread for all I/O;
  • Deterministic behaviour, independent of the kernel scheduler.

Disadvantages:

  • Doesn't scale with the CPUs in the system;
  • Difficult to implement correct code.

I/O Completion Ports and .NET ThreadPools

This is by far the best available mechanism for implementation of asynchronous I/O within the .NET framework of Windows. It is also designed to be the highest performing model within the Windows framework, as documented in Chapter 8, Windows Internals, Sixth Edition (I/O Completion Ports).

Unfortunately, there is very little documentation or code examples on how to use I/O completion ports within .NET. The MDSN documentation is also quite limited. Documentation is provided here. Details follow on how it is used and pitfalls.

Advantages over previous models:

  • Similar to existing concepts with asynchronous streams (e.g.
    FileStream
    
    class);
  • Can be used to implement your own asynchronous streams, as the programming models are very similar;
  • Scales well with the number of CPUs in your machine;
  • High performance.
Disadvantages:
  • Poor documentation on MSDN, little examples (and incorrect) on the Web;
  • Memory leaks may be difficult to find;
  • Memory corrupt may occur.

I/O Completion Ports and .NET ThreadPools

We cover in more details the last option. It is recommended to use a .NET reflector such as ILSpy or Reflector to study Microsoft's implementation of the SerialPort or FileStream classes. They also use I/O completion ports and ThreadPools for their internal implementation.

There are four steps to using I/O Completion Ports. While this article is similar to the blog at BeefyCode, it is not compatible in the case of using the ReadFile() and WriteFile() API's. Clarification is given later in using the Win32 API, "Initiating Asynchronous I/O".

Creating Overlapped I/O

The methods CreateFile() and CreateNamedPipe() provide a flags option to specify FILE_FLAG_OVERLAPPED. The method CreatePipe() does not allow these flags to be specified however and as such cannot be used for overlapped I/O. Overlapped I/O must be specified for usage with any kind of overlapped I/O (when using Events for notification, or I/O completion ports).

As a workaround for the Process class, a port was made from Dave Hart. The C# equivalent can be found in the downloaded code as the method Win32.CreatePipeEx(). It creates a named pipe using CreateNamedPipe() for inbound binary data, and opens the write end with the CreateFile() API.

Methods and their prototypes are specified below, adapted from PInvoke.NET. The enum's can be found in the Native.cs file in the downloadable source.

C#
internal static class UnsafeNativeMethods {
  [DllImport("kernel32.dll", SetLastError = true, CharSet = CharSet.Unicode)]
  public static extern SafeFileHandle CreateFile(
    string lpFileName,
    [MarshalAs(UnmanagedType.U4)] NativeMethods.FileAccess dwDesiredAccess,
    [MarshalAs(UnmanagedType.U4)] NativeMethods.FileShare dwShareMode,
    ref NativeMethods.SECURITY_ATTRIBUTES lpSecurityAttributes,
    [MarshalAs(UnmanagedType.U4)] NativeMethods.CreationDisposition dwCreationDisposition,
    [MarshalAs(UnmanagedType.U4)] NativeMethods.FileAttributes dwFlagsAndAttributes,
    IntPtr hTemplateFile);
  [DllImport("kernel32.dll", SetLastError = true)]
  public static extern SafeFileHandle CreateNamedPipe(string lpName, 
    [MarshalAs(UnmanagedType.U4)] NativeMethods.PipeOpenMode dwOpenMode,
    [MarshalAs(UnmanagedType.U4)] NativeMethods.PipeMode dwPipeMode, 
    uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize,
    uint nDefaultTimeOut, ref NativeMethods.SECURITY_ATTRIBUTES lpSecurityAttributes);
}

Creating a file handle that can use overlapped I/O may be done as so:

C#
SafeFileHandle hFile;

string pipeName = string.Format("\\\\.\\Pipe\\{0:X}.{1},
  SafeNativeMethods.GetCurrentProcessId(), Guid.NewGuid().ToString("D"));

hReadFile = UnsafeNativeMethods.CreateNamedPipe(pipeName, 
  NativeMethods.PipeOpenMode.PIPE_ACCESS_INBOUND | 
    (NativeMethods.PipeOpenMode)NativeMethods.FileAttributes.FILE_FLAG_OVERLAPPED,
  NativeMethods.PipeMode.PIPE_TYPE_BYTE | NativeMethods.PipeMode.PIPE_WAIT,
  1, nSize, nSize, 120 * 1000, ref lpPipeAttributes);

if (hReadFile.IsInvalid) return false;

Equivalently when using the CreateFile() API:

C#
hWriteFile = UnsafeNativeMethods.CreateFile(pipeName, 
  NativeMethods.FileAccess.GENERIC_WRITE,
  NativeMethods.FileShare.FILE_SHARE_NONE, 
  ref lpPipeAttributes, 
  NativeMethods.CreationDisposition.OPEN_EXISTING,
  NativeMethods.FileAttributes.FILE_ATTRIBUTE_NORMAL | 
    NativeMethods.FileAttributes.FILE_FLAG_OVERLAPPED, 
  IntPtr.Zero);

Important is to provide the flag FILE_FLAG_OVERLAPPED as one of the options when creating the file handle.

Binding the Handle and I/O Completion Ports

This is by far the most mysterious part of the article. For more information, refer to Windows Internals, Sixth Edition, Chapter 8, "I/O Completion Ports". This is the same as Windows Internals, Fifth Edition, Chapter 7.

An I/O completion port is an object exposed by the Windows executive which can be associated to multiple file handles. That is, one completion port, multiple file handles. Any overlapped file operation that is associated with an I/O completion port results in Windows sending a completion packet to the completion port. Multiple threads may wait on the same completion port. Completion ports provide the advantage of concurrency, so that in general only n threads that are associated with I/O are waiting on a completion port at any one time.

The .NET subsystem specifies the number of concurrent threads based on the of CPU threads in the system (e.g. a 4 core CPU with hyper-threading has 8 CPU threads). This is specified when creating the completion port.

The Windows function CreateIoCompletionPort() is used to create a completion port as well as to associated file handles with a completion port. That is done by the

Thread.BindHandle(SafeHandle
        handle)
method in the .NET framework. Using Rohitab's API monitor, we can see the following:

Module
API
Return Value
clr.dll
CreateIoCompletionPort ( 0xffffffffffffffff, NULL, 0, 8 ) 0x0000000000000204
KERNELBASE.dll NtCreateIoCompletion ( 0x00000000010ae528, IO_COMPLETION_ALL_ACCESS, NULL, 8 ) STATUS_SUCCESS
clr.dll
CreateIoCompletionPort ( 0x00000000000001f0, 0x0000000000000204, 8792377772672, 8 ) 0x0000000000000204
KERNELBASE.dll NtSetInformationFile ( 0x00000000000001f0, 0x00000000010ae600, 0x00000000010ae5f0, 16, FileCompletionInformation ) STATUS_SUCCESS
clr.dll
CreateIoCompletionPort ( 0x00000000000001f4, 0x0000000000000204, 8792377772672, 8 ) 0x0000000000000204
KERNELBASE.dll NtSetInformationFile ( 0x00000000000001f4, 0x00000000010ae600, 0x00000000010ae5f0, 16, FileCompletionInformation ) STATUS_SUCCESS

There are two calls to Thread.BindHandle(), one with handle 0x1f0 and the second with the handle 0x1f4. It can be observed with the first call that the .NET framework calls CreateIoCompletionPort() twice, one to create the completion port, the second to bind the handle to the completion port 0x204.

So, once the file handle has been created, it should be bound to a completion port as found in Process.AsyncConsoleReader.ctor(). If this step is missed, it won't be possible for the .NET framework to later issue a callback to indicate an asynchronous operation has completed.

C#
public unsafe AsyncConsoleReader(SafeFileHandle streamHandle, string name) : base(name) {
  m_StreamHandle = streamHandle;
  ThreadPool.BindHandle(streamHandle);

  ConsoleAsyncResult ar = new ConsoleAsyncResult(this);
  DoReadOperation(ar);
}

The unsafe keyword is required as the constructor initiates read operation which uses pointers.

Creating the Overlapped Structure

The Windows API that support overlapped I/O generally have a parameter

LPOVERLAPPED
        lpOverlapped
. This parameter can be formed from the .NET
Overlapped
class via the Pack() method. 

Let's say you have an array byte[] buffer, which data should be read to asynchronously. You should use the second form of

Overlapped.Pack(IOCallback,
        buffer)
. The MSDN documentation states explicitly why:

The runtime pins the buffer or buffers specified in buffer for the duration of the I/O operation. If the application domain is unloaded, the runtime keeps the memory pinned until the I/O operation completes.

Therefore, it is incorrect to not pass the buffer that is being modified by the I/O operation to the Pack() method. While it is possible to use the GC to pin the buffer, if the application domain ends while ending (e.g. your program ends during an asynchronous I/O operation) you may still have data corruption. This method of creating the

NativeOverlapped
structure is simpler and safe. The buffer remains pinned until Overlapped.Unpack() is called, and memory for the NativeOverlapped structure is maintained until Overlapped.Free() is called.

Before you create the NativeOverlapped structure, you need to first create an Overlapped object. For the implementation of the Process class, we don't need to use Events (which might be required if you're implementing a Stream instead that supports BeginRead() and EndRead()). The most generic form of the Overlapped constructor should be used which allows one to provide a generic object of type IAsyncResult.

The IAsyncResult is necessary to allow generic data to be passed from the caller which initiates the asynchronous I/O to the callback, which should be implemented as a static method (and therefore has no access to the this object)

As an example:

C#
private static unsafe void DoReadOperation(ConsoleAsyncResult ar) {
  AsyncConsoleReader acr = (AsyncConsoleReader)ar.AsyncState;
  // Buffer.EndArray is fixed automatically by the Pack() method.
  NativeOverlapped* noverlapped =

    new Overlapped(0, 0, IntPtr.Zero, ar).Pack>(ReadCompletionCallback, acr.Buffer.EndArray);
  ..
}

Initiating Asynchronous I/O

Once the NativeOverlapped structure has been created, it can be passed to methods that then perform asynchronous I/O, such as the ReadFile() function. To make marshalling simpler, the fixed keyword is used, which requires unsafe in C#.

The prototype for the ReadFile() is defined as:

C#
[DllImport("kernel32.dll", SetLastError = true)]
public unsafe static extern bool ReadFile(SafeFileHandle hFile, byte* lpBuffer,
  uint nNumberOfBytesToRead, IntPtr lpNumberOfBytesRead, NativeOverlapped* lpOverlapped);

This allows for using memory buffers efficiently, by writing directly to the queue avoiding a copy operation. The implementation is so that it always writes to the next available byte in the queue, instead of writing to a special buffer and then later copying into the queue. By implementing a queue that is a linked list of arrays, we can limit the size of memory that is locked by the GC and allow efficient memory handling in the GC itself. See the PagedQueue<> implementation in the source code provided.    

The read operation, which writes directly into the queue, is then started with:

C#
private static unsafe void DoReadOperation(ConsoleAsyncResult ar) {
  ..

  bool result;
  fixed (byte* pBuf = acr.Buffer.EndArray) {
    result = UnsafeNativeMethods.ReadFile(acr.m_StreamHandle, pBuf + acr.Buffer.End, 
      (uint)acr.Buffer.WriteLength, IntPtr.Zero, noverlapped);
  }
 
  if (!result) {
    int error = Marshal.GetLastWin32Error();
    if (error != 997) {
      ReadCompletionCallback((uint)error, 0, noverlapped);
    }
  }
  // else, the callback is still executed
}

In case of an error that does not indicate ERROR_IO_PENDING (error code 997), the NativeOverlapped structure should be freed. This is done by calling the ReadCompletionCallback (see the next section) with the provided error. This allows for a central location for error handling.

When implementing an asynchronous Stream, everything described above would be done as part of the BeginRead() method.

Pitfalls

A special note must be made as there is no obvious documentation to describe the behaviour of the ReadFile() operation and only a hint in Windows Internals.

The ReadFile() method will return true if the operation was synchronous and successful. If it is successful, it is not necessary to do anything, the I/O completion port still receives an I/O completion packet and executes the callback. It is an error in this case to free the structure as described at BeefyCode.

The first implementation called Overlapped.Unpack() and Overlapped.Free() in case of success and call ReadFile() again to start a new operation. However, it was observed (at least on Windows 8) that no more callbacks would occur after a synchronous read operation.

In case of implementing your own Stream, you should set appropriate fields in your IAsyncResult to indicate that the operation was synchronous and successful. The callback will still be called.

As to the hint in Windows Internals, there is the function SetFileCompletionNotificationModes() API which could change this behaviour. But this function is not used and has not been tested by the author, as it appears to be new for Windows Vista and later (e.g. it is not valid for Windows XP). Secondly, Windows Internals and MSDN do not match in documentation.

Callback Processing

When the asynchronous I/O operation has completed, an I/O completion packet is queued. A .NET thread associated with the I/O completion port (from the .NET ThreadPool) receives notification and executes the callback given Pack() method. All relevant information is provided in the form of an error code, number of bytes read (or written) and the

NativeOverlapped
structure.

From the NativeOverlapped structure, one can obtain the original Overlapped structure with the Unpack() method. This unpins the buffer provided in the Pack() method. From the Overlapped structure, one obtains the AsyncState object which is of type IAsyncResult.

If implementing an asynchronous Stream, the Stream would call the users callback, allowing them to call EndRead() and potentially initiating a new call to BeginRead(). As the Process class manages asynchronous I/O completely internally, presenting data outside as a synchronous stream, the AsyncState object created by the Process class, contains all necessary information required to initiate a new read operation.

It is important for every asynchronous operation to ensure that Overlapped.Unpacked(nativeOverlapped) is called followed by Overlapped.Free(nativeOverlapped). The Microsoft SerialPort implementation does this by called Unpacked() in the callback and Free() when the user calls EndRead(). This is why Microsoft state memory leaks can occur if a call to EndRead() for asynchronous streams is not called.

The code for the completion callback in the Process class is shown for completion:

C#
private static unsafe void ReadCompletionCallback(uint errorCode, uint numBytes, 
    NativeOverlapped* nativeOverlapped) {
  ConsoleAsyncResult ar;
  try {
    // Unpin the NativeOverlapped structure by unpacking it
    ar = (ConsoleAsyncResult)Overlapped.Unpack(nativeOverlapped).AsyncResult;
 
    if (errorCode == 0) {
      if (numBytes > 0) {
        AsyncConsoleReader cr = (AsyncConsoleReader)ar.AsyncState;
        cr.WriteBuffer((int)numBytes);
      }
    } else {
      //   6 - ERROR_INVALID_HANDLE
      // 109 - ERROR_BROKEN_PIPE
      // 995 - ERROR_OPERATION_ABORTED
      if (errorCode == 109 || errorCode == 6 || errorCode == 995) {
        cr.m_StreamHandle.Close();
      } else {
        System.Diagnostics.Trace.WriteLine("ReadCompletionCallback: error " + errorCode + " for " + cr.Name);
      }
    }
  } finally {
    Overlapped.Free(nativeOverlapped);
  }
  if (errorCode == 0) DoReadOperation(ar);
}

We see that we always Unpack() the structure and free the memory associated with the NativeOverlapped structure. If there was no error, we initiate a new read operation, essentially reading until the remote pipe is closed, or until I/O is cancelled.

Observations

Within the ReadCompletionCallback() method is a hidden call to a user event, as part of cr.WriteBuffer((int)numBytes). As a user delegate is called, this user delegate can prevent further I/O from occurring, as the next DoReadOperation() will only occur when the user delegate has finished.

This was an initial issue for the ReadLine(int timeout) method if called within an DataReceived event which calls the user delegate. The delegate could theoretically call ReadLine(1000). If a complete line isn't available, it should wait for more data until the timeout of 1000ms expires. It can't do this however, if there is no more I/O. Hence, the OnDataReceived event starts the event on a thread in the .NET threadpool. If new data arrives during the execution of the event, it is remembered and a new event is triggered if data is still available.

Cancelling I/O

Disposing the Process object results in a call to CancelIoEx() to ensure that any asynchronous I/O operation is cancelled for closure. This results in the callback being called with the error 995 ERROR_OPERATION_ABORTED.

SafeHandles

SafeHandles are an essential feature to the usage of asynchronous I/O. It protects us in the case that a handle is closed (by using hFile.Close() instead of CloseHandle()). If the handle is currently in use by Windows API, it is not immediately closed due to internal reference counting. In case a closed handle is passed to a Windows function, an exception is raised indicating the handle is already closed.

Further Information

As provided in the main text, one can learn a lot through reflection

Follow Up

The following items are available for follow up:

  1. How can one send an I/O packet to indicate that a thread should abort, as indicated in Windows Internals?
  2. What is the most effective way to cancel I/O? The Dispose() methods in AsyncConsoleReader and AsyncConsoleWriter call CancelIoEx(), which may have already been closed elsewhere (and indeed, the try { } catch { } blocks cover this case.
    1. MS Serial Port typically checked for error code 6, ERROR_INVALID_HANDLE

Points of Interest

It was quite difficult in finding information for proper usage and handling the case when ReadFile() indicated synchronous success (it returned true) took about 2 days, a lot of sniffing and in the end blind luck, as to why no more read operations would occur.

History

  • 07-Jan-2013: Initial Version

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)