Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Avoiding Deadlocks with System.IO.Stream BeginRead/BeginWrite

0.00/5 (No votes)
10 Apr 2013 1  
Using System.IO.Stream default implementation of BeginRead() and BeginWrite() may result in unexpected deadlocks.

Introduction

Microsoft's System.IO.Stream abstract class provides default implementations of the BeginRead() and BeginWrite() methods, which if not overridden, call your own implementation of Read() and Write(), respectively in background threads. If your stream is full duplex, that is, reading and writing are expected to work in parallel and are independent of each other, deadlocks may occur when using the default implementation. These deadlocks are by design in Microsoft's implementation.

Background

A class was written which works on a stream where reading and writing are expected to work in parallel and where asynchronous operations are used, such as BeginRead() and EndRead(). Such a stream may be a pipe with asynchronous reading and writing in both directions, network based streams, or hardware based communication streams such as a Serial Port (see my SerialPortStream on CodePlex). To test the implementation, a class was built that creates two streams that can "talk" to each other via shared memory. A write in the "client" stream provides data that can be read by the "server" stream and vice-versa. The two streams implemented events and locking to ensure reads block until data from the remote end-point arrives.

The test stream implemented the basic methods:

  • CanTimeout = true; CanRead = true; CanWrite = true; CanSeek = false;
  • ReadTimeout; WriteTimeout;
  • Read(); Write(); Flush() as a no-op;

The methods were not supported in the test stream are

  • Length; Position; Seek; SetLength

The unit test cases would execute a test case multiple times in quick succession to stress the implementation by writing to the test stream and receiving data from the test stream to confirm the results.

The Problem

The test case exhibited deadlocks which resulted in timeout exceptions in the unit test case. A timeout is not expected, unless the test class was not able to send data. When pausing the test case at the time of the deadlock, the thread responsible for writing the response to the client was blocked. I could see that the method BeginWrite() was being called, but the .NET framework was not calling the Write() implementation as was documented in MSDN. A debugger showed that the block had occurred within the framework while waiting on a semaphore.

mscorlib.dll!System.Threading.Monitor.Wait(obj, millisecondsTimeout, exitContext) + 0x16 bytes

mscorlib.dll!System.Threading.SemaphoreSlim.WaitUntilCountOrTimeout(
        millisecondsTimeout = -1, startTime = 0, cancellationToken) + 0x60 bytes

mscorlib.dll!System.Threading.SemaphoreSlim.Wait(millisecondsTimeout, cancellationToken) + 0x178 bytes

mscorlib.dll!System.IO.Stream.BeginWriteInternal(buffer = {byte[26]}, offset = 0, count = 26, 
  callback = {Method = ??}, state = {RedJam.InetServices.StreamWriteString}, serializeAsynchronously) + 0x99 bytes

mscorlib.dll!System.IO.Stream.BeginWrite(buffer, offset, count, callback, state) + 0x16 bytes

> InetService.dll!RedJam.InetServices.StreamWriteString.SendNextLine(
     sw = {RedJam.InetServices.StreamWriteString}) Line 144 + 0x59 bytes C#

Analysis

To be able to see the symbols from within the .NET Framework, one needs to first configure Visual Studio to download symbols from the Microsoft Public Symbol server. Microsoft also provide reference source code for the .NET framework, see http://referencesource.microsoft.com/netframework.aspx. Combine this with tools such as ILSpy, it's possible to understand what is happening in the .NET framework. In addition, the reference source code also contains any comments from the original developer that may provide useful insight into the design (hence why I believe the deadlocks were by design).

Asynchronous Reading and Writing

The bulk of the work occurs in BeginWriteInternal and BeginReadInternal. The source from MS for writing is given below for reference. Please note the first comment and the semaphore. This same semaphore is also used in the implementation for BeginReadInternal and is managed from a class called RunReadWriteTaskWhenReady(sema, ar). This is the cause for the deadlock.

internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset, 
         int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
{ 
  Contract.Ensures(Contract.Result<iasyncresult>() != null); 
  if (!CanWrite) __Error.WriteNotSupported();
 
  // To avoid a race with a stream's position pointer & generating ----
  // conditions with internal buffer indexes in our own streams that 
  // don't natively support async IO operations when there are multiple
  // async requests outstanding, we will block the application's main 
  // thread if it does a second IO request until the first one completes. 
  var semaphore = EnsureAsyncActiveSemaphoreInitialized();
  Task semaphoreTask = null; 
  if (serializeAsynchronously) {
    semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block
  } else { 
    semaphore.Wait(); // synchronously wait here 
  }
 
  // Create the task to asynchronously do a Write.  This task serves both
  // as the asynchronous work item and as the IAsyncResult returned to the user.
  var asyncResult = new ReadWriteTask(false /*isRead*/, delegate { 
    // The ReadWriteTask stores all of the parameters to pass to Write.
    // As we're currently inside of it, we can get the current task 
    // and grab the parameters from it. 
    var thisTask = Task.InternalCurrent as ReadWriteTask;
    Contract.Assert(thisTask != null, "Inside ReadWriteTask, 
             InternalCurrent should be the ReadWriteTask"); 
 
    // Do the Write
    thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
    thisTask.ClearBeginState(); // just to help alleviate some memory pressure 
    return 0; // not used, but signature requires a value be returned
  }, state, this, buffer, offset, count, callback); 
 
  // Schedule it
  if (semaphoreTask != null) 
    RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
  else
    RunReadWriteTask(asyncResult);
 
  return asyncResult; // return it
}

Hence, although it's not specified in the MSDN documentation that a read and write operation may not run in parallel, this is exactly what is happening. By analyzing carefully every time my test stream entered and exited BeginRead/Read/EndRead and BeginWrite/Write/EndWrite, I was able to confirm that at no time the Read and Write methods were running in parallel.

Such a design decision is required if persistent storage is used for the backend (e.g. a synchronous implementation of a FileStream), to prevent corruption of the file pointer. In the case described here, Seek was not supported and so this limitation on concurrency is not required.

The Solution

The solution is relatively simple, we must provide our own implementation of BeginRead/EndRead and BeginWrite/EndWrite that do not have this concurrency limitation. In .NET 4.0 we can use Delegates to solve this problem elegantly. After this implementation, the test case passed without blocking.

using System.Runtime.Remoting.Messaging;
public class MyStream : Stream
{
  ...
  ... Constructor, Read(), Write() and other methods
  ...
 
  delegate int ReadDelegate(byte[] buffer, int offset, int count);
 
  public override IAsyncResult BeginRead(byte[] buffer, int offset, 
         int count, AsyncCallback callback, object state) {
    ReadDelegate read = this.Read;
    return read.BeginInvoke(buffer, offset, count, callback, state);
  }
 
  public override int EndRead(IAsyncResult asyncResult) {
    AsyncResult result = (AsyncResult)asyncResult;
    ReadDelegate caller = (ReadDelegate)result.AsyncDelegate;
    return caller.EndInvoke(asyncResult);
  }
 
  delegate void WriteDelegate(byte[] buffer, int offset, int count);
 
  public override IAsyncResult BeginWrite(byte[] buffer, int offset, 
                  int count, AsyncCallback callback, object state) {
    WriteDelegate write = this.Write;
    return write.BeginInvoke(buffer, offset, count, callback, state);
  }
 
  public override void EndWrite(IAsyncResult asyncResult) {
    AsyncResult result = (AsyncResult)asyncResult;
    WriteDelegate caller = (WriteDelegate)result.AsyncDelegate;
    caller.EndInvoke(asyncResult);
  }
}

Summary

This article is written with the explicit case for a unit test case. However, the principles apply to any stream (testing or production) where reading and writing are independent of each other.

An alternative solution would be to design the stream class upfront to be asynchronous, and then to provide a read method that calls BeginRead() immediately followed by EndRead(). My recommendation is to design your classes up front in this manner. However, if you have existing code where you need to add full support for asynchronous reading and writing, the technique described in this article can be applied.

History

  • 2013-04-10: Initial document

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here