Introduction
Microsoft's System.IO.Stream
abstract class provides default implementations of the BeginRead()
and BeginWrite()
methods, which if not overridden, call your own implementation of Read()
and Write()
, respectively in background threads. If your stream is full duplex, that is, reading and writing are expected to work in parallel and are independent of each other, deadlocks may occur when using the default implementation. These deadlocks are by design in Microsoft's implementation.
Background
A class was written which works on a stream where reading and writing are expected to work in parallel and where asynchronous operations are used, such as BeginRead()
and EndRead()
. Such a stream may be a pipe with asynchronous reading and writing in both directions, network based streams, or hardware based communication streams such as a Serial Port (see my SerialPortStream on CodePlex). To test the implementation, a class was built that creates two streams that can "talk" to each other via shared memory. A write in the "client" stream provides data that can be read by the "server" stream and vice-versa. The two streams implemented events and locking to ensure reads block until data from the remote end-point arrives.
The test stream implemented the basic methods:
-
CanTimeout = true; CanRead = true; CanWrite = true; CanSeek = false;
-
ReadTimeout; WriteTimeout;
-
Read(); Write(); Flush() as a no-op;
The methods were not supported in the test stream are
-
Length; Position; Seek; SetLength
The unit test cases would execute a test case multiple times in quick succession to stress the implementation by writing to the test stream and receiving data from the test stream to confirm the results.
The Problem
The test case exhibited deadlocks which resulted in timeout exceptions in the unit test case. A timeout is not expected, unless the test class was not able to send data. When pausing the test case at the time of the deadlock, the thread responsible for writing the response to the client was blocked. I could see that the method BeginWrite()
was being called, but the .NET framework was not calling the Write()
implementation as was documented in MSDN. A debugger showed that the block had occurred within
the framework while waiting on a semaphore.
mscorlib.dll!System.Threading.Monitor.Wait(obj, millisecondsTimeout, exitContext) + 0x16 bytes
mscorlib.dll!System.Threading.SemaphoreSlim.WaitUntilCountOrTimeout(
millisecondsTimeout = -1, startTime = 0, cancellationToken) + 0x60 bytes
mscorlib.dll!System.Threading.SemaphoreSlim.Wait(millisecondsTimeout, cancellationToken) + 0x178 bytes
mscorlib.dll!System.IO.Stream.BeginWriteInternal(buffer = {byte[26]}, offset = 0, count = 26,
callback = {Method = ??}, state = {RedJam.InetServices.StreamWriteString}, serializeAsynchronously) + 0x99 bytes
mscorlib.dll!System.IO.Stream.BeginWrite(buffer, offset, count, callback, state) + 0x16 bytes
> InetService.dll!RedJam.InetServices.StreamWriteString.SendNextLine(
sw = {RedJam.InetServices.StreamWriteString}) Line 144 + 0x59 bytes C#
Analysis
To be able to see the symbols from within the .NET Framework, one needs to first configure Visual Studio to download symbols from the Microsoft Public Symbol server. Microsoft also provide reference source code for the .NET framework, see http://referencesource.microsoft.com/netframework.aspx. Combine this with tools such as ILSpy, it's possible to understand what is happening in the .NET framework. In addition, the reference source code also contains any comments from the original developer that may provide useful insight into the design (hence why I believe the deadlocks were by design).
Asynchronous Reading and Writing
The bulk of the work occurs in BeginWriteInternal
and BeginReadInternal
. The source from MS for writing is given below for reference. Please note the first comment and the semaphore. This same semaphore is also used in the implementation for
BeginReadInternal
and is managed from a class called RunReadWriteTaskWhenReady(sema, ar)
. This is the cause for the deadlock.
internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset,
int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
{
Contract.Ensures(Contract.Result<iasyncresult>() != null);
if (!CanWrite) __Error.WriteNotSupported();
var semaphore = EnsureAsyncActiveSemaphoreInitialized();
Task semaphoreTask = null;
if (serializeAsynchronously) {
semaphoreTask = semaphore.WaitAsync(); } else {
semaphore.Wait(); }
var asyncResult = new ReadWriteTask(false , delegate {
var thisTask = Task.InternalCurrent as ReadWriteTask;
Contract.Assert(thisTask != null, "Inside ReadWriteTask,
InternalCurrent should be the ReadWriteTask");
thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
thisTask.ClearBeginState(); return 0; }, state, this, buffer, offset, count, callback);
if (semaphoreTask != null)
RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
else
RunReadWriteTask(asyncResult);
return asyncResult; }
Hence, although it's not specified in the MSDN documentation that a read and write operation may not run in parallel, this is exactly what is happening. By
analyzing carefully every time my test stream entered and exited BeginRead
/Read
/EndRead
and BeginWrite
/Write
/EndWrite
, I was able to confirm that at no time the Read
and Write
methods were running in parallel.
Such a design decision is required if persistent storage is used for the backend (e.g. a synchronous implementation of a FileStream
), to prevent corruption of the file pointer. In the case described here, Seek
was not supported and so this limitation on concurrency is not required.
The Solution
The solution is relatively simple, we must provide our own implementation of
BeginRead
/EndRead
and
BeginWrite
/EndWrite
that do not have this concurrency limitation. In .NET 4.0 we can use Delegates to solve this problem elegantly. After this implementation, the test case passed without blocking.
using System.Runtime.Remoting.Messaging;
public class MyStream : Stream
{
...
... Constructor, Read(), Write() and other methods
...
delegate int ReadDelegate(byte[] buffer, int offset, int count);
public override IAsyncResult BeginRead(byte[] buffer, int offset,
int count, AsyncCallback callback, object state) {
ReadDelegate read = this.Read;
return read.BeginInvoke(buffer, offset, count, callback, state);
}
public override int EndRead(IAsyncResult asyncResult) {
AsyncResult result = (AsyncResult)asyncResult;
ReadDelegate caller = (ReadDelegate)result.AsyncDelegate;
return caller.EndInvoke(asyncResult);
}
delegate void WriteDelegate(byte[] buffer, int offset, int count);
public override IAsyncResult BeginWrite(byte[] buffer, int offset,
int count, AsyncCallback callback, object state) {
WriteDelegate write = this.Write;
return write.BeginInvoke(buffer, offset, count, callback, state);
}
public override void EndWrite(IAsyncResult asyncResult) {
AsyncResult result = (AsyncResult)asyncResult;
WriteDelegate caller = (WriteDelegate)result.AsyncDelegate;
caller.EndInvoke(asyncResult);
}
}
Summary
This article is written with the explicit case for a unit test case. However, the principles apply to any stream (testing or production) where reading and writing are independent of each other.
An alternative solution would be to design the stream class upfront to be asynchronous, and then to provide a read method that calls BeginRead()
immediately followed by EndRead()
. My recommendation is to design your classes up front in this manner. However, if you have existing code where you need to add full support for asynchronous reading and writing, the technique described in this article can be applied.
History
- 2013-04-10: Initial document