Introduction
The DebugStream
class is a simple implementation of .NET's System.IO.Stream
API. It allows projects that use Streams to run locally, without
pumping data through a Pipe or with TCP/IP.
Background
I was working on a project that used serial communication between two PCs. I wanted to be able to test and debug the project without having serial cables hooked up
to my development machine all of the time. Using physical cables was particularly cumbersome because I use a laptop for development.
Using the code
The publicly accessible class is called DebugStream
. However, DebugStream
is not itself a Stream. In order to allow async reading
and writing with System.IO.Stream
's pre-written Beginxxx and Endxxx
methods, the read stream and write stream have to be separate classes. The read
stream can be accessed through the DebugStream.ReadStream
property. Likewise, the write stream is accessible through the WriteStream
property.
To use the Stream
, simply instantiate the DebugStream
and access its properties.
var ds = new DebugStream();
var writeStream = ds.WriteStream;
var readStream = ds.ReadStream;
After doing so, you could write a file through the DebugStream
concurrently as follows:
string srcPath = "filePath";
string destPath = "filePath";
System.Threading.Tasks.Parallel.Invoke(
() =>
{
using (var dest = new FileStream(destPath, FileMode.Create))
readStream.CopyTo(dest);
},
() =>
{
using (var src = new FileStream(srcPath, FileMode.Open))
src.CopyTo(writeStream);
writeStream.Close();
}
);
I cannot get the code to upload as a .zip file, so here is the full source code of my Debug Stream:
#if DEBUG_STREAM
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.IO;
using System.Threading;
namespace DebugStream
{
public class DebugStream
{
private const int DefaultBufferSize = 4096;
private int _disposing;
private readonly object _bufferLock = new object();
private readonly byte[] _buffer;
private int _head = 0;
private int _tail = 0;
private bool _empty = true;
private Stream _readStream = null;
private Stream _writeStream = null;
public DebugStream() : this(DefaultBufferSize) {}
public DebugStream(int bufferSize) {
if (bufferSize <= 0)
throw new ArgumentException("bufferSize must be positive and nonzero.",
"bufferSize");
_buffer = new byte[bufferSize];
}
public Stream ReadStream {
get {
if (_readStream == null) {
_readStream = new InternalReadStream(this);
}
return _readStream;
}
}
public Stream WriteStream {
get {
if (_writeStream == null) {
_writeStream = new InternalWriteStream(this);
}
return _writeStream;
}
}
private void _checkDisposed() {
if (_disposing != 0)
throw new ObjectDisposedException(GetType().FullName);
}
protected void Dispose(bool disposing) {
lock (_bufferLock) {
Thread.VolatileWrite(ref _disposing, 1);
Monitor.PulseAll(_bufferLock);
}
}
private class InternalReadStream : Stream
{
private readonly DebugStream _parent;
internal InternalReadStream(DebugStream parent) {
_parent = parent;
}
public override bool CanRead {
get { return true; }
}
public override bool CanSeek {
get { return false; }
}
public override bool CanWrite {
get { return false; }
}
public override void Flush() {
throw new NotImplementedException();
}
public override long Length {
get {
throw new NotSupportedException("Cannot get the length of this Stream.");
}
}
public override long Position {
get {
throw new NotSupportedException("Cannot get the position of this Stream.");
}
set {
throw new NotSupportedException("Cannot set the position of this Stream.");
}
}
public override int Read(byte[] buffer, int offset, int count) {
if (buffer == null)
throw new ArgumentNullException("buffer", "buffer is null");
if (count < 0 || offset < 0)
throw new ArgumentException("offset or count is negative.");
if (offset + count > buffer.Length)
throw new IndexOutOfRangeException("The sum of offset and count is larger than the buffer length.");
var userBuffer = buffer;
lock (_parent._bufferLock) {
while (_parent._head == _parent._tail && _parent._empty && (_parent._disposing == 0))
Monitor.Wait(_parent._bufferLock);
if (_parent._disposing != 0 && _parent._head == _parent._tail && _parent._empty)
return 0;
int firstRead;
int secondRead;
int buffDiff = _parent._head - _parent._tail;
if (buffDiff > 0) {
firstRead = Math.Min(buffDiff, count);
secondRead = 0;
} else {
firstRead = Math.Min(_parent._buffer.Length - _parent._tail, count);
secondRead = Math.Min(count - firstRead, _parent._head);
}
if (firstRead > 0) {
Buffer.BlockCopy(_parent._buffer, _parent._tail, userBuffer, offset, firstRead);
if (secondRead > 0) {
Buffer.BlockCopy(_parent._buffer, 0, userBuffer, firstRead, secondRead);
}
_parent._tail = (_parent._tail + firstRead + secondRead);
if (_parent._tail >= _parent._buffer.Length)
_parent._tail -= _parent._buffer.Length;
if (_parent._tail == _parent._head)
_parent._empty = true;
Monitor.PulseAll(_parent._bufferLock);
}
return firstRead + secondRead;
}
}
public override long Seek(long offset, SeekOrigin origin) {
throw new NotSupportedException("Cannot seek on this Stream.");
}
public override void SetLength(long value) {
throw new NotSupportedException("Cannot set the length of this Stream.");
}
public override void Write(byte[] buffer, int offset, int count) {
throw new NotSupportedException("Cannot write to this Stream.");
}
protected override void Dispose(bool disposing) {
_parent.Dispose(disposing);
base.Dispose(disposing);
}
}
private class InternalWriteStream : Stream
{
private readonly DebugStream _parent;
internal InternalWriteStream(DebugStream parent) {
_parent = parent;
}
public override bool CanRead {
get { return false; }
}
public override bool CanSeek {
get { return false; }
}
public override bool CanWrite {
get { return true; }
}
public override long Length {
get {
throw new NotSupportedException("Cannot get the length of this Stream.");
}
}
public override long Position {
get {
throw new NotSupportedException("Cannot get the position of this Stream.");
}
set {
throw new NotSupportedException("Cannot set the position of this Stream.");
}
}
public override void Flush() {
}
public override int Read(byte[] buffer, int offset, int count) {
throw new NotSupportedException("Cannot read from this Stream.");
}
public override long Seek(long offset, SeekOrigin origin) {
throw new NotSupportedException("Cannot seek on this Stream.");
}
public override void SetLength(long value) {
throw new NotSupportedException("Cannot set the length of this Stream.");
}
public override void Write(byte[] buffer, int offset, int count) {
if (buffer == null)
throw new ArgumentNullException("buffer", "buffer is null");
if (count < 0 || offset < 0)
throw new ArgumentException("offset or count is negative.");
if (offset + count > buffer.Length)
throw new IndexOutOfRangeException("The sum of offset and count is larger than the buffer length.");
var userBuffer = buffer;
int remaining = count;
lock (_parent._bufferLock) {
while (remaining > 0) {
while (_parent._tail == _parent._head && !_parent._empty && (_parent._disposing == 0)) {
Monitor.Wait(_parent._bufferLock);
}
_parent._checkDisposed();
int firstWrite;
int secondWrite;
if (_parent._empty) {
_parent._tail = _parent._head = 0;
firstWrite = Math.Min(remaining, _parent._buffer.Length);
secondWrite = 0;
} else if (_parent._tail > _parent._head) {
firstWrite = Math.Min(remaining, _parent._tail - _parent._head);
secondWrite = 0;
} else {
firstWrite = Math.Min(remaining, _parent._buffer.Length - _parent._head);
secondWrite = Math.Max(0, Math.Min(remaining - firstWrite, _parent._tail));
}
if (firstWrite > 0) {
Buffer.BlockCopy(userBuffer, count - remaining, _parent._buffer, _parent._head, firstWrite);
if (secondWrite > 0) {
Buffer.BlockCopy(userBuffer, count - remaining + firstWrite, _parent._buffer, 0, secondWrite);
}
_parent._head = (_parent._head + firstWrite + secondWrite);
if (_parent._head >= _parent._buffer.Length)
_parent._head -= _parent._buffer.Length;
_parent._empty = false;
}
remaining -= (firstWrite + secondWrite);
Monitor.PulseAll(_parent._bufferLock);
}
}
}
protected override void Dispose(bool disposing) {
_parent.Dispose(disposing);
base.Dispose(disposing);
}
}
}
}
#endif
Points of Interest
The code is fairly straightforward, but there are some tricky synchronization issues that you should be wary of. Most importantly, DebugStream
will not close
until you explicitly close it. If you call into Write
and do not correspondingly call Read
, your code will deadlock until
Close
or Dispose
is called on the stream. This is the same behavior as a serial port or network stream exhibits.
History
Published article.