In this article, we will discuss issues related to stream operations and code implementations.
Introduction
Before we begin talking about streaming or any associated details, we would like to make a commentary on the vocabulary used in this article, in order to avoid the state of confusion.
- We will use the word "Stream" or "Streams" to talk about the underlying code-implementation (interfaces, associated classes, etc.) capable of:
- forming a pipeline by repetitive use in tandem
- mutating and forwarding
byte
-sequences to next such stream in chain
As a matter of fact, as we talk about those Stream implementation, we remain indifferent to the implementation complexity and source of such implementation (i.e., part of framework, an open source library or home-made recipe) as long as the desired results can be obtained. In effect, this assumption is so important, because, otherwise it would be impossible for us to obtain a truly context agnostic API that is capable of supporting virtually any operation on the underlying byte-streams (as we will see below).
- We will use the word "Streaming", to associate the flow of byte-sequences as they goes through such a stream based pipeline.
In fact, as a special case of such declaration, when one of such streams in the pipeline is actually a network (HTTP) stream and the flowing byte contents are multimedia contents; we obtain the implementation of mutlimedia-streaming (or just streaming as it is widely known). Thus, it is important to remember that we do NOT restrict the scope of the word "streaming" to mutimedia-streaming, during our discussion.
Relevant Trivia
Streams are, undoubtably, one of its kind and special-purpose breeded beasts. Though a stream can be thought as generator of byte sequences; it unfolds its true power through its dynamics. Let us explain. In fact, at runtime, any other object, be that string
, array
, data in a custom class/
struct
objects, even the source code itself; are all sequence of bytes in memory residing somewhere. Even, a simple integer
value can be thought as a sequence of 4 bytes (for int32 in .NET). In addition, it is possible to extract a sub-sequence from these byte representation of these objects to perform some delicate operations; yet such byte sequences lack dynamics which streaming exhibits out of the box. And, for our discussion, the notion of such flow, associated to bytes and runtime processing, comes in handy as we start talking about our work on data-streaming. Going forward, we will NOT ONLY make an effort to explain this phenomenon in details, BUT ALSO, will propose a completely novel APIs to deal with data streaming requirements.
As soon as we hear the word "Streaming", many pictures comes to mind, like watching a videos online, watching a live telecast of an event, listening to a favorite song online, etc. More or less, we almost immediately associate Multimedia contents like Audio & Video with this word. Thus, it is important for us to switch the gear and set a platform. In order to do so, first, we would like to personalize the definition of Streaming, by expanding the Streaming universe in our definition, that can be written as plainly as:
"Sending/transferring data1, potentially as varying sized chunks of binary data, continuously; at the same time, permitting the receiving-end2 to continuously process those chunks, whenever possible, in independent fashion (i.e. without buffering data1)."
1The term "Data" is contextual here. For us, it is whatever defines as whole dataset, i.e., whole video or just a 1 second clip of that video or simply a "Hello World!
" string or a never ending data series.
2The term "receiving-end" is used to identify the next Stream in tandem.
With such a definition at hand:
- We are more interested in
BYTE
format of data instead of mediatype. Hence, we want to deal with any kind of data that is either promptly available as bytes or convertible (irrespective of complexity of conversion) to bytes. - We want to transfer data continuously, i.e., as it becomes available, and, potentially as chunks. Thus, we strive to not to buffer whole dataset, in memory, at any point during the streaming.
- We are agnostic to underlying protocols/APIs, as long as we are able to send those data-chunks continuously.
- We want to design a scheme/framework/mechanism that can support any such arbitrary data processing end-to-end.
- And, we are receiver agnostic as long as it is able to accept such data-chunks (i.e., irrespective of its data-processing capabilities).
Implementation Notes
- From a theoretical point of view, the article is generic in nature and may remain valid for several languages/frameworks; however, we have implemented our thoughts in C# .NET and we would be pitching some .NET code snippets throughout the discussion.
- Readers who wants to compile the attached source code, as it is, in Visual Studio should make sure that they have .NET Framework 4.7.2 SDKs installed and have C# language version 7.1 or above installed (as mentioned in MSDN blog).
NOTE: Statistics, presented in this article, are obtained with following system configuration:
Reasons First
Before we try to understand why we thought of such an implementation, we should first understand the existing tools we have. Considering we have two Stream
instances _readableStream
and _writableStream
; as the name suggests we can read from _readableStream
and write to _writableStream
. Further assume, we have a trivial task at hand which warrants us to copy data from _readableStream
to _writableStream
. Most of the languages/framework provide the following implementation (more or less) to achieve it:
byte[] buffer = new byte[buffer_size];
while ((readLength = _readableStream.read(buffer)) > 0)
{
_writableStream.write(buffer, 0, readLength);
}
From the above code snippet, we notice that by using a fixed size buffer (normally of a few KB in size), we achieve such stream-to-stream copy. Complexity is linear to the stream length and we don't consume much space; fair enough.
But wait a moment, we made an assumption here that streams are associated to I/O devices (especially _writableStream
) like file, network, etc. But, what would happen when our _writableStream
turns out to be an in-memory stream (MemoryStream
in C# .NET), then we immediately increases the space complexity. And what if both (_readableStream
and _writableStream
) are in-memory streams. Then space requirement is doubled.
But why do we care so much about it? Simplistically speaking, It's sufficient to say that Memory is Cheap BUT not FREE and neither LIMITLess, nonetheless, the reason is NOT that simple. Thus, without adding any further verbosity; the author invites readers to read an excellent article, titled "To Heap or not to Heap; That’s the Large Object Question?", written by Doug Duerner, Yeon-Chang Wang, to understand those details related to increasing space complexity associated with large objects (such as strings, list or arrays in general).
In general, reducing runtime memory is our first reason. On the same lines, our next reason is latency which can be reduced by re-using the same buffer (allocated once) during copy operations, without the need to spend precious CPU time in re-sizing/copying byte arrays (in memory) to buffer entire data.
Though, normally less talked, our next reason is code organization (e.g. readability,testability, separation of concerns etc.); our goal is to prepare an API to perform streaming operations which is intuitive and expressive. Furthermore, we want to embed some sort of artificial intelligence in our APIs to allow us to bring runtime malleability (i.e., conditional plumbing of pipeline) in our chain of streams. In the end, we want to have a liberty to build pipelines to perform arbitrary operations (WILDCARDs) on the running chunks of byte without losing the associated benefits. As a matter of fact, we will build some specific streaming operations to demonstrate such wildcard capabilities.
Being Pragmatic
If you have followed us until here, you might argue that streaming is NOT that significantly used in a regular application and even most of the applications do NOT go beyond file reading/writing. We cannot argue about that as it is experience based argument. However, following non-exhaustive list does provide usage of streaming:
- WebAPIs
- Base64 conversion
- Object Seriailization
- Data Encryption
- Data Compression
- Hash computing
- File handling... so on and so forth...
Measuring Performance of a Trivial Task
Before going into details, let's start with a simple example. Assume we have the following task at hand:
Definition:
Give a path of a binary file, read all its bytes. First, decompress it using GZip compression algorithm, then deserialize data as a well-defined Object array (i.e. List<T> where T is known) using JSON serializer.
From the above statement, we can identify three (3) distinct operations, namely:
- Read all bytes from the given file
- Use GZip algorithm to decompress those bytes
- With Json serializer, create
List<T>
(T
is known or it is a generic place holder it hardly matters) from decompressed bytes
To maintain code readability and by neglecting any performance/code optimization (just for the moment), we consider implementation of following three (3) functions:
public byte[] PullAllBytesFrom(FileInfo file)
{
return File.ReadAllBytes(file.FullName);
}
public byte[] DecompressUsingGzip(byte[] compressedBytes)
{
var unzippedData = new MemoryStream();
using (var unzipper = new GZipStream(new MemoryStream(compressedBytes),
CompressionMode.Decompress, false))
{
unzipper.CopyTo(unzippedData);
}
return unzippedData.ToArray();
}
public List<T> DeserializeAs<T>(byte[] data)
{
return JsonConvert.DeserializeObject<T>(new UTF8Encoding().GetString(data));
}
We could have written the code in other way, however, the reason that we created all the three (3) operations separately is a subject for later discussion and we will speak of those in details there. For the moment, we just want to focus on the performance of the following code:
public List<T> DeserializeListFrom<T>(FileInfo compressedJsonFile)
{
var fileBytes = PullAllBytesFrom(compressedJsonFile);
var uncompressedBytes = DecompressUsingGzip(fileBytes);
return DeserializeAs<List<T>>(uncompressedBytes);
}
If you run similarly written code of "DeserializeListFrom
" (if you have downloaded the attached source code from this article, you can run PerfCompareNonStreamingWithStreamingAsync
method), you will see the following similar performance graphs from the Visual Studio Diagnostic Tools (NOTE: API method is our implementation and subject of this discussion and DeserializeListFrom
is similarly written method as shown in the above snippet):
Looking at this image, we see there is a costly memory consuming operation going on during code execution and perhaps the byte array was re-allocated several time (hence, recopied). Overall, it's evident that we have an opportunity here to win big on memory and significantly on CPU time too. Thus, knowing the issue, we can drill down further.
Defining Goals
Based on our discussion so far, we want to:
- avoid the usage of in-Memory buffers to improve on runtime memory
- work only with necessary fixed size buffers
- be able to create efficient pipeline (chain of operations) end-to-end (source to target)
- create an API that offers:
- Composability: composition of operations
- Readability: composition are declarative
- Maintainability: promotes single responsibility principle for each underlying composed operation
- Elasticity: open to any exotic and/or regular data processing requirement
- Reusability: permits run-time mutation in a composed chain in a deterministic manner
Rest of the article is going to present the work we have done to achieve the above listed goals.
Streams In General
On the surface, all streams looks alike and it's hard to put those in different bins. However, to exploit streaming capabilities, we do need to understand different characteristics of those stream implementations.
Unidirectional Vs Bidirectional
Fortunately, in .NET, there exists a well-defined interface for Streams (inside System.IO
namespace, Stream
is defined as Abstract
class) and all stream implementations are inherited from it. We take a closer look at some of its capabilities as shown below:
public abstract class Stream : MarshalByRefObject, IDisposable
{
public abstract bool CanRead { get; }
public abstract bool CanWrite { get; }
public abstract int Read(byte[] buffer, int offset, int count);
public abstract void Write(byte[] buffer, int offset, int count);
}
Thus, apart from Read
and Write
methods, Stream
exposes CanRead
and CanWrite
truth values; thus, if a stream supports Read
operation, it shall return true
for CanRead
and similarly if it supports Write
operation, it should be truthy for CanWrite
. In fact, NOT all stream implementations return True
for both of these properties. Thus, we can say when stream is either Readable
or Writable
(not both), it's unidirectional (e.g. FileStream
with read access); similarly, when its both Readable
& Writable
at the same time, it is bidirectional (e.g., MemoryStream
with writable=true
).
Open-Ended Vs Closed-Ended
Some stream implementations are in fact closed in the sense that they are bound to target device; e.g., FileStream
is bound to physical location on a disk. On the other hand, some stream implementation are open (agnostic) to the target involved in reading or writing operations, i.e., they operate on abstraction (e.g., abstract Stream
class in .NET). Such streams, often, require an instance of Stream
at construction time (i.e., constructor call); for e.g., GZipStream
constructor accepts another Stream
's instance for reading/writing operation during decompression/compression respectively, yet, agnostic to whether the given Stream
is MemoryStream
or FileStream
. Though the given explanation (and stream classification) looks trivial in nature, it enables us to make a chain (pipeline) during streaming.
In fact, as we will see later, based on this distinct characteristics of Streams, our proposed API is able to create a chain of streaming operations in tandem without relying on intermediate full data-buffering between two independent streaming operations.
Specifics of MemoryStream
NOTE: Below listed concerns equally, more or less, apply to Byte[]
(byte arrays) and List<Byte>
(list of bytes)
MemoryStream
is unique in its own way. Under the hood, it is a simple Byte
array whose capacity is adjusted during write operations (in a similar way as if it is a List<Byte>
, i.e., allocating bigger array and recopying bytes from existing array) and the array is traversed during read operations. Though the current implementation works just fine, nonetheless, those array (buffer
) resizing operations do add some pressure on CPU (memory allocation/data copying). Such operations can affect performance significantly if involved data (total bytes) size is large. Though it would be hard to point out the data size limit as a single number; however, once the array reached 85000 bytes in size, we would be touching Large Object Heap (LOH) and any new call (during write operation) to resize this array to a bigger capacity will only end up dealing with LOH. In short, once MemoryStream
is involved in any streaming related operation one should be careful.
We have already seen above (in pseudo code under the title "Reasons First"), stream to stream copy employs a fixed size buffer and reuses buffer during its iterative copying operations (byte chunks limited to buffer capacity); instead of using MemoryStream
(read everything in memory from source stream and then write everything to target stream). Furthermore, we know the MemoryStream
is NOT thread-safe and it is not possible to write and read from it at the same time. Though, it is all the way possible to create a new thread-safe version of in-memory stream, yet such a painstaking effort might not necessarily bring fruits; especially in the case when writer that is writing on such memory stream is way too fast than the associated reader on the same stream (the internal array will eventually grow and may create a performance hit). Thus, we identify that any in-memory buffering of data (beyond fixed size buffers required for regular streaming operations) is not helpful for a streaming oriented API. And, down the lane, we'll discuss, our approach avoids such usage of in-memory byte arrays.
Flow of Data
Roughly speaking, streaming data flow, based on the nature of source/target of the data, can be listed as:
- One kind of byte representation to another kind of byte representation (e.g., text data in a file to a compressed file)
- Memory data-structure to a byte representation, i.e., serialization + some additional stream processing (e.g., json serialization of a .NET class instance to an encrypted file on hard-disk)
- From a byte-representation to a memory data-structure, i.e., deserialization + some additional stream processing
Based on these flows, we identify data-structure which are most commonly encountered during streaming, and, potentially are responsible for those unwanted performance hits:
string
: Normally obtained during serializations, file reading, string concatenations, Base64 operations, etc. byte[]
: Obtained normally from string encoding, use of in-memory stream, File reading, etc. MemoryStream
: Normally appears due to misaligned stream pipeline T[]
or List<T>
or any similar collection of object, where T
is a known serializable object: Normally targets of serialization/Deserialization operations
Furthermore, we identify most common streaming operations (also available as a part of the framework):
- File handling
- Byte encoding
- Compression
- Hash Computing
- Base64 Conversion
- Encryption/Decryption
Finally, we also recognize a few following frequent requirements:
- Stream Fan-out: When a given stream needs to be inputted to multiple targets, for e.g., in order to maintain data availability by mean of redundancy, same data is copied to several files streams and/or send to remote services etc.
- Stream Length: When interest is to obtain the data byte count based on chosen encoding and treatment.
Overall, if we accumulate all these thoughts to prepare a mind-map, we come up with the following naive illustration:
Academic interests aside, the presence of streaming makes sense when:
- Data is persisted (e.g., as a file on hard disk) and when persisted data is consumed
- Data is transferred to another entity/process (e.g., server to client)
Irrespective of the use-case, the data-flow can be modeled as if the data publisher side (i.e., data producer) pushes the data at one end and the consumer side of the data pulls the data at another end. Depending upon the nature of data exchange, consumer either can run concurrently or sequentially. For e.g., in case of http communication, while sender is writing data on network during chunked transfer-encoding, receiver may recover payload data simultaneously; whereas when sender writes the data to a file, receiver can consume (in absence of synchronization) the file only after file is persisted. Secondly, during these data push(es), whenever sender applies any data transformation, consumer, normally, requires to apply inverse transformations in reverse order to obtain the original data. These are the common streaming scenarios where performance can be optimized. The following diagram illustrates the same idea.
Thus, we see above, that all data transformation operations (shown by OP-) at sender side have corresponding inverse transformation (shown by INV-OP-) in reverse order (i.e., if sender applies OP-1 before OP-2, then receiver applies INV-OP-2 before INV-OP-1). Thus, let's say if sender first serialized data as json and then applied GZip-compression, then, in order to get back the equivalent original data in-memory representation, receiver first applies GZip decompression and then deserializes the JSON data.
NOTE:
Some data transformations are inherently non-reversable, i.e., once the data is transformed, it is theoretically not possible to obtain the original data; for e.g., Cryptographic HASH computing. But of course, if the intent is to just send the HASH of the data to the receiver, then it is already assumed that original data is NOT required at receiver's end. Thus, for these similar cases, above shown reverse chain won't be present at receiver side, nonetheless, streaming can still be used with all its benefits; for e.g., to obtain the hash of the data in its target byte representation.
Towards Implementation
We noticed above, in order to implement our trivial task (under the title "Measuring performance of a trivial task"), we wrote three (3) distinct functions. The idea behind having those dedicated functions was to achieve composability, e.g., if we have a new feature requirement that demands to read from json file and obtain a known object (not necessarily list), the resultant code may look like:
public T DeserializeObjectFrom<T>(FileInfo uncompressedJsonFile)
{
var uncompressedBytes = PullAllBytesFrom(uncompressedJsonFile);
return DeserializeAs<T>(uncompressedBytes);
}
So without rewriting/modifying/breaking existing code, we immediately (almost) delivered a feature. Such innocent implementation honestly speaking fits well to SOLID (Single responsibility). However, if we notice, at surface, these implementations look benign, however, as we start increasing the file byte size, we discover associate issues. We realize that the "File.ReadAllBytes
" allocates byte array proportional to the size of the file, but what is less remarked is that, internally, "File.ReadAllBytes
" still using a similar buffer copy loop, as shown in Psuedo Code in the very beginning. Once we realize that, using those "fixed size buffer copying loop" is one of the strengths (and perhaps the least understood/appreciated) of streaming, we can appreciate all that comes here onwards.
Aware of this issue, we might be tempted to change the code in following way (in order to improve on the performance):
public T DeserializeObjectFrom<T>(FileInfo uncompressedJsonFile)
{
using(var fileStream = new FileStream(uncompressedJsonFile, ...))
{
using(var textReader = new TextReader(fileStream, ...))
{
using(var jsonReader = new JsonReader(textReader, ...))
{
}
}
}
}
But, with this code, we again realize we complicate the code. Not only we reduced readability and punctured maintainability (in a way), but also, added unwanted code redundancy; i.e., with such nested calls (of "using
") we need to re-write some (major) part of the code twice (one as listed above and another with GZip Compression). In fact, in long run, we will notice that we create redundancy every time we write any stream related code (inside same project and across multiple projects). Above all, such implementations are neither composable nor elastic nor reusable (Note: reusable in broad sense).
Visualizing Implementation
Reading/interpreting such a verbose text is by no mean easy, thus, here we make an effort to provide some illustrations to have understanding of above written literature. Let's first try to understand what happens, at different moment in time, while our "Code Id 1" (from above "Measuring performance of a trivial task" title) runs.
For simplicity, let's assume the following time-scale:
- At
T = 0
, we call it Initiation point where we assume memory usage is Nil
(zero) and code execution just waiting to execute "PullAllBytesFrom
" line. - At
T = t1
, code "var fileBytes = PullAllBytesFrom(compressedJsonFile);
" has been successfully executed and our fileBytes
variable holds the reference to Byte array. - At
T = t2
, code "var uncompressedBytes = DecompressUsingGzip(fileBytes);
" has been successfully executed and our uncompressedBytes
variable holds the reference to uncompressed Byte array. - At
T = t3
, call to "DeserializeAs<List<T>>(uncompressedBytes);
" is finished
We make the following assumptions to avoid complexity in visualization:
- GC (Garbage collector) is NOT running until t3
- Buffers used internally, by framework, are comparatively insignificant in size, thus, can be dropped from the visualization
- We can use linear approximation for memory allocation/re-allocation
- File size is 1 MBytes while decompressed data and
DeserializedList
both individually requires 2 MBytes each, hypothetically
NOTE: At T=t3, we assume final return statement will execute, GC will occur and Memory reduced to 2 Mbytes (hold by list).
Based on above listed assumption, following is an approximate visualization:
Even in this simple image, which ignores memory wastage (due to reallocation/copy), we clearly observe that we have unnecessarily consumed memory upto 5 Mbytes (peak at t3); as target state consumes only 2 MBytes of memory. Having realized this fact, it makes it easy to envision ideal target state which is consistent with the following visualization:
Comparing Image 4 and Image 5 immediately gives us insights on the gains we would like to make. Now, we are in a position to actually discuss implementation details that helps us achieve our goals.
So Far, Not Far...
Until here, we have discussed associated issues related to stream operations and code implementations, and have identified the goals. In general, we have gathered material based on which we would like to assert the choices we have made during our implementation.
We decided to split the whole article into two parts. We did not want to just show the implementation but wanted to elaborate the "why" behind it. We are still working on Part 2 of the article to complete our discussion and we would love to hear comments from our readers in order to improve on the quality of the material.
We also invite our readers to check the attached example code source and the implementation ahead in time.