In this article, you will get an in-depth understanding of deserializing very large simple & complex Json Streams using both Newtonsoft.Json & System.Text.Json in C# and VB based on a real-life application migration requirement.
Dot Net 6.0
Dot Net 7.0
NOTE: Both downloads contain the same files, only different compression methods are used. Once downloaded, go to the Getting Started section for setting up the data for use.
Working with JSON Series
Table of Contents
Introduction
This is the third and final part of this series of articles. We will be covering deserializing JSON Streams using both NewtonSoft.Json
and System.Text.Json
.The data for streaming can be from the web, files, or other sources. We will also look at deserializing JSON files from within a zip file. Lastly, wrap the processes in library base classes to simplify the code required, unit testing, and benchmarking for performance monitoring.
What is a Stream?
Microsoft's explanation of a stream:
Quote:
A stream is an abstraction of a sequence of bytes, such as a file, an input/output device, an inter-process communication pipe, or a TCP/IP socket. The Stream class and its derived classes provide a generic view of these different types of input and output, and isolate the programmer from the specific details of the operating system and the underlying devices.
Why Use Streaming?
There are many reasons why. The key benefits of working with streams are:
- We do not have to load the entire data, like a JSON object, into memory. It is more memory-efficient, so improves overall performance
What are the benefits of working with large JSON data?
- Work with objects as they are deserialized, no need to keep all objects in memory
- If there is an issue with the JSON data, we can fail fast
- Can cancel during mid-streaming
As an example, one of the sample JSON data files used in this article is approx 890MB. When loaded into memory and deserialized as a single object, it consumes over 6GB of memory! When deserialized as a stream, it is less than 60MB per object.
VB.NET Limitation (System.Text.Json)
As mentioned in the previous article, Working with System.Text.Json in C#, Ref Strut
is not supported in VB.NET. The Utf8JsonAsyncStreamReader
can only be written in C#.
The other exceptions are the JsonSamples.Host
project for Web API support for testing remote data and the GetDataFiles
for building the JsonSamples
folder.
Apart from this limitation, this article will include code for both C# & VB + the included solutions with C# (79 projects) and VB (79 projects).
Code and Data Used
There are a lot of parts to this article. I have written the article so that you can pick the sections of information that you require. There are a large number of small targeted projects that cover the code needed to use, rather than lumped into one fancy UI monolithic application. I trust that this will help with understanding the code for your own use.
All code included in the download cover both local (file system) and remote (web API) streaming, both in C# and VB, for Newtonsoft
and System.Text.Json
. Benchmarks and Unit Tests are included for both C# & VB.
The sample data used was either built using Mockaroo or from EBay's Sandbox data, not live data. The data files being used are approximately 900MB in size.
The custom Utf8JsonAsyncStreamReader
JSON reader for System.Text.Json
has been thoroughly tested and is production ready.
Note: This is not an exact replacement for the Newtonsoft
JsonReader
as it is specific to asynchronous stream use only.
There is stream support for both synchronous and asynchronous APIs. For this article, the focus is based on multitasking/background task operation, so will be exclusively targeting asynchronous techniques. If you are not familiar with TPL / Async & Await, then please read Asynchronous programming with async and await - Microsoft Learn.
The solution structure for both C# and VB are:
Prototypes
- The bare minimum code required. Libraries were built based on the code in these prototypes. Applications
- Both DI & non-DI samples for file system
and Web API
(.Remote
) Libraries
- wrappers for Newtonsoft
and System.Text.Json
streaming APIs + supporting classes and extensions Unit Tests
- For the NewtonSoft
and System.Text.Json
generic wrapper libraries, custom Utf8JsonAsyncStreamReader
, and supporting helper methods Benchmarks
Measuring the performance of the default Newtonsoft
and System.Text.Json
methods + wrapper libraries
Both the DI (Dependency Injection) and non-DI sample projects are included for Newtonsoft
and System.Text.Json
. Also, there are both File System
and Web API
versions of each type. There are multiple projects that make up each application and are segmented into individual class libraries. The Application structure used is as follows:
Application
|
+-- <type>.<JsonLib>
| |
| + -- Common.Ebay
| | |
| | +-- Common
| |
| +--- Common.<JsonLib>
| | |
| | +-- Common
| |
| +--- Ebay.Resources (.zip)
|
+-------- <Application>.Shared
where:
Common
contains code common to all projects Common.<JsonLib>
is specific to the <JsonLib>
used - Newtonsoft
or System.Text.Json
. Contains the common stream deserializer handler code and custom readers + extension methods Common.Ebay
configuration/file lists Ebay.Resources
JSON data files; .zip
with zipped files Ebay.<JsonLib>
contains the typed stream deserializer handlers and the complex object models wired up for the specific <JsonLib>
Application
the core for managing the processing
NOTE: There are a number of projects in the solution, approximately 79 for each language, half for Newtonsoft.Json
and half for System.Text.Json
. Also, there are large data files. Compiling the complete solution will use a lot of disk space. So, it is highly recommended to compile the sample application(s) that you want to run.
As there are many projects for both CSharp and VB, the obj & bin folders for each project are moved to consolidated obj & bin folders of the root solution folder for easy management. Please leave a comment below if you wish for me to write a tip on how this is done. Take a peek at the .csproj & .vbproj to see how I achieved this.
Definitions
There are two types of large JSON files that we can work with. I have defined them as follows:
- Simple JSON Collection Object
Quote:
A collection of objects of the same type held in the root JSON collection object.
- Complex JSON Objects
Quote:
A JSON object with one or more properties where a property is a collection of objects and individual properties. The properties &/or collection to be deserialized does not have to be in the root of the JSON Object.
Getting Started
When you download and unpack the project solution, you need to run the JsonSamples.Host
web project. This is used for both the Prototype
/Application
projects and also the Setup
project. Once the JsonSamples.Host
web project (DeserializingJsonStreams.Hosting - CSharp.sln
solution) is running, run the Setup
(GetDataFiles
) project to build the required JsonSamples
folder. The JsonSamples.Host
web project will generate the required zip files and copy the required JSON sample files.
NOTE: If this process fails, it could be because the JsonSamples.Host
web project hosting port address has changed. If this is the case, go into the Config folder and update the host
property in the appsettings.json file. This file is used by all applications in the project/solution that require remote access to the Web API server.
{
"Host" : "localhost:7215"
}
Part 1: Working with Streams
Streams implement the IDisposable
interface. Therefore, we need to make sure that we release the resource to avoid memory leaks.
Working with Files
await using (Stream stream = File.OpenRead(filename))
{
}
Using stream As Stream = File.OpenRead(filename)
End Using
Working with Web APIs
await using (Stream httpStream = await new HttpClient().GetStreamAsync(this._url)
.ConfigureAwait(false))
{
}
Using stream = Await New HttpClient().GetStreamAsync(_url).ConfigureAwait(false)
End Using
Streaming with Newtonsoft
Working with streams using Newtonsoft
is quite simple. Here is a sample snippet for setting up working with a FileStream
:
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
}
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader As JsonReader = New JsonTextReader(textReader)
End Using
End Using
Simple JSON Collection Object Deserializing
A typical Simple JSON collection is a list of Objects in an array:
[
{
"id":1,
"first_name":"Osbert",
"last_name":"Petcher"
},
{
"id":2,
"first_name":"Salvador",
"last_name":"Marmion"
},
{
"id":3,
"first_name":"Kellen",
"last_name":"Philbin"
},
{
"id":4,
"first_name":"Fred",
"last_name":"Thuillier"
}
]
As this is a collection of objects, the build in JsonSerializer
will process the data one object at a time.
Here is the code to deserialize the above JSON as a FileStream
with Newtonsoft.Json
:
using Newtonsoft.Json;
this._jsonSerializerSettings = new();
this._serializer = JsonSerializer.Create(this._jsonSerializerSettings);
await using Stream stream = File.OpenRead(filename);
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
await jsonReader.ReadAsync().ConfigureAwait(false);
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
{
Contact? contact = this._serializer!.Deserialize<Contact>(jsonReader);
Process(contact!);
}
}
Imports Newtonsoft.Json
_jsonSerializerSettings = New JsonSerializerSettings()
_serializer = JsonSerializer.Create(_jsonSerializerSettings)
Using stream As Stream = File.OpenRead(filename)
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader As JsonReader = New JsonTextReader(textReader)
Await jsonReader.ReadAsync().ConfigureAwait(False)
While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
jsonReader.TokenType <> JsonToken.EndArray
Dim contact = _serializer.Deserialize(Of Contact)(jsonReader)
Process(contact)
End While
End Using
End Using
End Using
And the processing:
private void Process(Contact? item)
{
this._count++;
}
Private Sub Process(item As Contact)
_count += 1
End Sub
File Stream Example
Putting it all together, we end up with something like this:
using Common.Helpers;
using Contacts.NewtonSoft.Json.Models;
using Newtonsoft.Json;
internal class Program
{
#region Fields
private readonly IFilePathHelper _fileHelper = new FilePathHelper("Resources");
private JsonSerializer? _serializer;
private JsonSerializerSettings? _jsonSerializerSettings;
private int _count;
private readonly string _file = "Mock_Contacts1.json";
#endregion
#region Methods
private static async Task Main()
=> await new Program().Execute().ConfigureAwait(false);
private async Task Execute()
{
Console.WriteLine($"Reading {this._file}");
this._jsonSerializerSettings = new();
this._serializer = JsonSerializer.Create(this._jsonSerializerSettings);
await using FileStream stream =
File.OpenRead(this._fileHelper.Resolve(this._file));
Console.WriteLine($"Processing: {this._file}");
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
await jsonReader.ReadAsync().ConfigureAwait(false);
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
{
Contact? contact = this._serializer!.Deserialize<Contact>(jsonReader);
Process(contact!);
}
}
Console.WriteLine($"Contacts: {this._count:N0}");
Console.WriteLine("Finished");
}
private void Process(Contact? item)
{
this._count++;
}
#endregion
}
Imports System.IO
Imports Common.Helpers
Imports Newtonsoft.Json
Imports NewtonsoftContact.Models
Module Program
#Region "Fields"
Private ReadOnly _fileHelper As FilePathHelper = New FilePathHelper("Resources")
Private _serializer As JsonSerializer
Private _jsonSerializerSettings As JsonSerializerSettings
Private _count As Integer
Private ReadOnly _file As String = "Mock_Contacts1.json"
#End Region
#Region "Methods"
Sub Main(args As String())
Console.WriteLine($"Reading {_file}")
ExecuteAsync(args).GetAwaiter.GetResult()
Console.WriteLine("Finished")
End Sub
Private Async Function ExecuteAsync(args As String()) As Task
_jsonSerializerSettings = New JsonSerializerSettings()
_serializer = JsonSerializer.Create(_jsonSerializerSettings)
Using stream As Stream = File.OpenRead(_fileHelper.Resolve(_file))
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader As JsonReader = New JsonTextReader(textReader)
Await jsonReader.ReadAsync().ConfigureAwait(False)
While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
jsonReader.TokenType <> JsonToken.EndArray
Dim contact = _serializer.Deserialize(Of Contact)(jsonReader)
Process(contact)
End While
End Using
End Using
End Using
Console.WriteLine($"Contacts: {_count:N0}")
End Function
Private Sub Process(item As Contact)
_count += 1
End Sub
#End Region
End Module
NOTE: To see the code running, see prototype \ local \ SimpleData \ NewtonsoftContact
VB/C# project.
Web API Example
The Web API version is almost identical:
using Common.Settings;
using Contacts.NewtonSoft.Json.Models;
using Newtonsoft.Json;
internal class Program
{
#region Fields
private JsonSerializer? _serializer;
private JsonSerializerSettings? _jsonSerializerSettings;
private int _count;
private string _url = "https://{0}/download/MOCK1";
#endregion
#region Methods
private static async Task Main()
=> await new Program().Execute().ConfigureAwait(false);
private async Task Execute()
{
this._url = this._url.Build();
this._jsonSerializerSettings = new();
this._serializer = JsonSerializer.Create(this._jsonSerializerSettings);
Console.WriteLine($"Connecting to stream: {this._url}");
Stream? stream;
try
{
stream = await new HttpClient().GetStreamAsync(this._url);
}
catch (Exception)
{
Console.WriteLine($"Failed to open stream {this._url}.
Please check that the remote server is active.");
return;
}
if (stream is null)
{
Console.WriteLine($"Failed to open stream {this._url}");
return;
}
Console.WriteLine($"Processing: {this._url}");
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
await jsonReader.ReadAsync().ConfigureAwait(false);
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
{
Contact? contact = this._serializer!.Deserialize<contact>(jsonReader);
Process(contact!);
}
}
stream.Close();
await stream.DisposeAsync().ConfigureAwait(false);
Console.WriteLine($"Contacts: {this._count:N0}");
Console.WriteLine("Finished");
}
private void Process(Contact? item)
{
this._count++;
}
#endregion
}
Imports System.IO
Imports System.Net.Http
Imports Common.Settings
Imports Newtonsoft.Json
Imports NewtonsoftContact.Remote.Models
Module Program
#Region "Fields"
Private _serializer As JsonSerializer
Private _jsonSerializerSettings As JsonSerializerSettings
Private _count As Integer
Private _url As String = "https://{0}/download/MOCK1"
#End Region
#Region "Methods"
Sub Main(args As String())
ExecuteAsync(args).GetAwaiter.GetResult()
Console.WriteLine("Finished")
End Sub
Private Async Function ExecuteAsync(args As String()) As Task
_url = _url.Build()
_jsonSerializerSettings = New JsonSerializerSettings()
_serializer = JsonSerializer.Create(_jsonSerializerSettings)
Console.WriteLine($"Connecting to stream: {_url}")
Dim stream As Stream
Try
stream = Await New HttpClient().GetStreamAsync(_url)
Catch ex As Exception
Console.WriteLine($"Failed to open stream {_url}.
Please check that the remote server is active.")
Return
End Try
If stream Is Nothing Then
Console.WriteLine($"Failed to open stream {_url}")
Return
End If
Console.WriteLine($"Processing: {_url}")
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader As JsonReader = New JsonTextReader(textReader)
Await jsonReader.ReadAsync().ConfigureAwait(False)
While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
jsonReader.TokenType <> JsonToken.EndArray
Dim contact = _serializer.Deserialize(Of Contact)(jsonReader)
Process(contact)
End While
End Using
End Using
stream.Close()
Await stream.DisposeAsync().ConfigureAwait(False)
Console.WriteLine($"Contacts: {_count:N0}")
End Function
Private Sub Process(item As Contact)
_count += 1
End Sub
#End Region
End Module
NOTE: To see the code running, see prototype \ remote \ SimpleData \ NewtonsoftContact
VB/C# project.
Complex JSON Objects with Selective Deserializing
Complex JSON is made up of individual simple properties, objects, and collections.
Below is an example of a complex JSON data structure:
{
"categoryTreeId": "123",
"categoryTreeVersion": "1.234a",
"categoryAspects": [
{
"category": {
"categoryId": "111",
"categoryName": "Category 1"
},
"aspects": [
{
"localizedAspectName": "1:Aspect 1"
},
{
"localizedAspectName": "1:Aspect 2"
},
{
"localizedAspectName": "1:Aspect 3"
}
]
},
{
"category": {
"categoryId": "222",
"categoryName": "Category 2"
},
"aspects": [
{
"localizedAspectName": "2:Aspect 1"
},
{
"localizedAspectName": "2:Aspect 2"
},
{
"localizedAspectName": "2:Aspect 3"
}
]
}
]
}
We are only interested in the "categoryAspects"
collection. The method used in the Simple JSON Collection cannot be used here. If we do, the entire object will be loaded into memory, not each CategoryAspect
object property.
To help understand the code below, here is a definition of how to Deserialize each CategoryAspect
object by walking the Structure manually:
- Check each property
- When we find the
"categoryAspects"
property, we can then extract each CategoryAspect
object
- find the start of the object
- walk and store the object graph until we hit the end of the object
- deserialize the object graph
- repeat until we hit the end of the array object
Here is the code to deserialize the above JSON as a FileStream
with Newtonsoft.Json
:
using Newtonsoft.Json;
JsonSerializer _serializer = new();
await using FileStream stream = File.OpenRead(_filename);
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
await jsonReader.ReadAsync().ConfigureAwait(false);
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
await ProcessAsync(jsonReader).ConfigureAwait(false);
}
Imports Newtonsoft.Json
_jsonSerializerSettings = New JsonSerializerSettings()
_serializer = JsonSerializer.Create(_jsonSerializerSettings)
Using stream As Stream = File.OpenRead(_filename)
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader As JsonReader = New JsonTextReader(textReader)
While Await jsonReader.ReadAsync().ConfigureAwait(False)
Await ProcessAsync(jsonReader).ConfigureAwait(False)
End While
End Using
End Using
End Using
The code to walk the JSON graph:
private async Task ProcessAsync(JsonReader jsonReader)
{
if (jsonReader.TokenType != JsonToken.PropertyName)
return;
if (jsonReader.GetString() == "categoryTreeVersion")
{
await jsonReader.ReadAsync().ConfigureAwait(false);
string? version = jsonReader.GetString();
Console.WriteLine($"Version: {version ?? "no value"}");
}
else if (jsonReader.GetString() == "categoryTreeId")
{
await jsonReader.ReadAsync().ConfigureAwait(false);
string? id = jsonReader.GetString();
Console.WriteLine($"Id: {id ?? "no value"}");
}
else if (jsonReader.GetString() == "categoryAspects")
{
await jsonReader.ReadAsync().ConfigureAwait(false);
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
ProcessCollection(jsonReader);
}
}
Private Async Function ProcessAsync(jsonReader As JsonReader) As Task
If jsonReader.TokenType <> JsonToken.PropertyName Then
Return
End If
If jsonReader.GetString() = "categoryTreeVersion" Then
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim version = jsonReader.GetString()
Console.WriteLine($"Version: {If(version, "no value")}")
End If
If jsonReader.GetString() = "categoryTreeId" Then
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim Id = jsonReader.GetString()
Console.WriteLine($"Id: {If(Id, "no value")}")
End If
If jsonReader.GetString() = "categoryAspects" Then
Await jsonReader.ReadAsync().ConfigureAwait(False)
While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
jsonReader.TokenType <> JsonToken.EndArray
ProcessCollection(jsonReader)
End While
End If
End Function
NOTE: The above processing code is not limited to root nodes, it will search the JSON graph and only process those nodes identified.
And to store each object, we use the same code as the previous simple collection example:
private void ProcessCollection(JsonReader jsonReader)
{
CategoryAspect? categoryAspect =
_serializer!.Deserialize<CategoryAspect>(jsonReader);
_count++;
}
Private Sub ProcessCollection(jsonReader As JsonReader)
Dim categoryAspect = _serializer.Deserialize(Of CategoryAspect)(jsonReader)
_count += 1
End Sub
NOTE: To see the code running, see prototype \ local \ ComplexData \ NewtonsoftEbay
VB/C# project.
Working with Zipped JSON Data Files
Zip files are good for compressing text-based JSON files, especially when working with very large files. We can stream-read zip files and the compressed JSON files within.
The code to read the data is the same as above, we just need to add the code to open the zip file instead and read the entries. Here, I assume that each file is of the correct type:
using ZipArchive zipArchive = new(File.OpenRead(_filename));
foreach (ZipArchiveEntry zipArchiveEntry in zipArchive.Entries)
{
Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}");
await using Stream stream = zipArchiveEntry.Open();
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
}
Using zipArchive = New ZipArchive(File.OpenRead(_fileHelper.Resolve(_file)))
For Each zipArchiveEntry As ZipArchiveEntry In zipArchive.Entries
Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}")
Using stream As Stream = zipArchiveEntry.Open()
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader = New JsonTextReader(textReader)
End Using
End Using
End Using
Next
End Using
If you need to look for specific files, just check the name of each ZipArchive
entry:
using ZipArchive zipArchive = new(File.OpenRead(_filename));
foreach (ZipArchiveEntry zipArchiveEntry in zipArchive.Entries)
{
if (zipArchiveEntry.Name == "file_name_goes_here")
{
Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}");
await using Stream stream = zipArchiveEntry.Open();
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
}
}
Using zipArchive = New ZipArchive(File.OpenRead(_fileHelper.Resolve(_file)))
For Each zipArchiveEntry As ZipArchiveEntry In zipArchive.Entries
if zipArchiveEntry.Name == "file_name_goes_here" then
Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}")
Using stream As Stream = zipArchiveEntry.Open()
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader = New JsonTextReader(textReader)
End Using
End Using
End Using
End If
Next
End Using
File Stream Example
Putting it all together, we end up with something like this:
using Common.Helpers;
using Ebay.NewtonSoft.Json.Models;
using Newtonsoft.Json;
internal class Program
{
#region Fields
private readonly IFilePathHelper _fileHelper =
new FilePathHelper("Resources");
private JsonSerializer? _serializer;
private JsonSerializerSettings? _jsonSerializerSettings;
private int _count;
private readonly string _file = "EBAY_US FetchItemAspectsResponse.json";
#endregion
#region Methods
private static async Task Main()
=> await new Program().Execute().ConfigureAwait(false);
private async Task Execute()
{
Console.WriteLine($"Reading {this._file}");
this._jsonSerializerSettings = new();
this._serializer = JsonSerializer.Create(this._jsonSerializerSettings);
using ZipArchive zipArchive =
new(File.OpenRead(this._fileHelper.Resolve(this._file)));
foreach (ZipArchiveEntry zipArchiveEntry in zipArchive.Entries)
{
Console.WriteLine
($"Processing: {this._file} > {zipArchiveEntry.FullName}");
await using Stream stream = zipArchiveEntry.Open();
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
while (await jsonReader.ReadAsync().ConfigureAwait(false))
await this.ProcessAsync(jsonReader).ConfigureAwait(false);
}
}
Console.WriteLine($"CategoryAspects: {this._count:N0}");
Console.WriteLine("Finished");
}
private async Task ProcessAsync(JsonReader jsonReader)
{
if (jsonReader.TokenType != JsonToken.PropertyName) return;
if (jsonReader.GetString() == "categoryTreeVersion")
{
await jsonReader.ReadAsync().ConfigureAwait(false);
string? version = jsonReader.GetString();
Console.WriteLine($"Version: {version ?? "no value"}");
}
else if (jsonReader.GetString() == "categoryTreeId")
{
await jsonReader.ReadAsync().ConfigureAwait(false);
string? id = jsonReader.GetString();
Console.WriteLine($"Id: {id ?? "no value"}");
}
else if (jsonReader.GetString() == "categoryAspects")
{
await jsonReader.ReadAsync().ConfigureAwait(false);
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
this.ProcessCollection(jsonReader);
}
}
private void ProcessCollection(JsonReader jsonReader)
{
CategoryAspect? categoryAspect =
this._serializer!.Deserialize<categoryaspect>(jsonReader);
this._count++;
}
#endregion
}
Imports System.IO
Imports System.IO.Compression
Imports Common.Helpers
Imports Newtonsoft.Json
Imports NewtonSoftZippedEbay.Models
Module Program
#Region "Fields"
Private ReadOnly _fileHelper As FilePathHelper =
New FilePathHelper("Resources")
Private _serializer As JsonSerializer
Private _jsonSerializerSettings As JsonSerializerSettings
Private _count As Integer
Private ReadOnly _file As String = "EBay CategoryAspects.zip"
#End Region
#Region "Methods"
Sub Main(args As String())
Console.WriteLine($"Reading {_file}")
ExecuteAsync(args).GetAwaiter.GetResult()
Console.WriteLine("Finished")
End Sub
Private Async Function ExecuteAsync(args As String()) As Task
_jsonSerializerSettings = New JsonSerializerSettings()
_serializer = JsonSerializer.Create(_jsonSerializerSettings)
Using zipArchive = New ZipArchive(File.OpenRead(_fileHelper.Resolve(_file)))
For Each zipArchiveEntry As ZipArchiveEntry In zipArchive.Entries
Console.WriteLine($"Processing: {_file} > {zipArchiveEntry.FullName}")
Using stream As Stream = zipArchiveEntry.Open()
Using textReader As TextReader = New StreamReader(stream)
Using jsonReader = New JsonTextReader(textReader)
While Await jsonReader.ReadAsync().ConfigureAwait(False)
Await ProcessAsync(jsonReader).ConfigureAwait(False)
End While
End Using
End Using
End Using
Next
End Using
Console.WriteLine($"CategoryAspects: {_count:N0}")
End Function
Private Async Function ProcessAsync(jsonReader As JsonReader) As Task
If jsonReader.TokenType <> JsonToken.PropertyName Then
Return
End If
If jsonReader.GetString() = "categoryTreeVersion" Then
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim version = jsonReader.GetString()
Console.WriteLine($"Version: {If(version, "no value")}")
End If
If jsonReader.GetString() = "categoryTreeId" Then
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim Id = jsonReader.GetString()
Console.WriteLine($"Id: {If(Id, "no value")}")
End If
If jsonReader.GetString() = "categoryAspects" Then
Await jsonReader.ReadAsync().ConfigureAwait(False)
While Await jsonReader.ReadAsync().ConfigureAwait(False) AndAlso
jsonReader.TokenType <> JsonToken.EndArray
ProcessCollection(jsonReader)
End While
End If
End Function
Private Sub ProcessCollection(jsonReader As JsonReader)
Dim categoryAspect = _serializer.Deserialize(Of CategoryAspect)(jsonReader)
_count += 1
End Sub
#End Region
End Module
NOTE: To see the code running, see prototype \ local \ ComplexZippedData \ NewtonSoftZippedEbay
VB/C# project. There is also a Contact version and Web API versions for both Contacts & Ebay.
NOTE: At the time of writing this article (DotNet 6.0), if you are using Web API streaming, the entire Zip file will be downloaded by DotNet before you can stream read. The above code, and project samples, will load the entire zip file into memory.
If you are working with very large zip files, you will need to stream to a cache file on disk before opening the archive for streaming. This will keep memory usage to a minimum. If you want to cache to a file, and are not sure how to download a stream to a file, please look at the DowloadService
in the GetDataFiles
project to see how.
Sample Projects
If you want to see the above code in action, download the project and run the samples in the Prototypes\Local or Prototypes\Remote folders. There are four samples in each:
- File-system:
NewtonsoftContacts
and NewtonsoftZippedContacts
for Simple JSON Collection Object Deserializing and NewtonsoftEbay
and NewtonSoftZippedEbay
for Complex JSON Objects with Selective Deserializing. - Web API:
NewtonsoftContacts.Remote
and NewtonsoftZippedContacts.Remote
for Simple Json Collection Object Deserializing and NewtonsoftEbay.Remote
and NewtonSoftZippedEbay.Remote
for Complex JSON Objects with Selective Deserializing.
NOTE: For the Web API sample projects, you will need to run the JsonSamples.Host
web project before the .Remote
sample projects.
Streaming with System.Text.Json
Out of the box, System.Text.Json
supports streaming. It looks something like this:
using System.Text.Json;
await using FileStream stream = File.OpenRead(filename);
List<Contact> contacts = await JsonSerializer.DeserializeAsync<List<Contact>>(stream);
Imports System.Text.Json
Using stream as FileStream = File.OpenRead(filename)
Dim contacts = JsonSerializer.DeserializeAsync(Of List(Of Contact))(stream)
The downside to this requires the entire file is to be loaded into memory before deserialization. This is the same as doing the following:
using System.Text.Json;
string rawJon = await File.ReadAllTextAsync(filename);
List<Contact> contacts = JsonSerializer.Deserialize<List<Contact>>(rawJon);
Imports System.Text.Json
Dim rawJson As String = Await File.ReadAllTextAsync(filename)
Dim contacts = JsonSerializer.Deserialize(Of List(Of Contact))(rawJson)
For a Simple JSON Collection Object, like the sample above, System.Text.Json
does support deserializing by object, therefore avoiding loading the entire file into memory. Here is an example:
using System.Text.Json;
await using FileStream stream = File.OpenRead(filename);
await foreach
(var Contact in JsonSerializer.DeserializeAsyncEnumerable<Contact>(stream))
{
}
Imports System.Text.Json
_options = New JsonSerializerOptions()
Using stream As Stream = File.OpenRead(_fileHelper.Resolve(_file))
Console.WriteLine($"Processing: {_file}")
Dim iterator = JsonSerializer.DeserializeAsyncEnumerable(Of Contact) _
(stream, _options).GetAsyncEnumerator()
Do While Await iterator.MoveNextAsync()
Dim item = iterator.Current
Process(item)
Loop
Await iterator.DisposeAsync()
End Using
NOTE: As you can see from the samples above, VB does not have C#'s await foreach
asynchronous loop, so we need to walk the asynchronous collection manually:
Dim iterator = <method_goes_here>.GetAsyncEnumerator()
Do While Await iterator.MoveNextAsync()
Dim item = iterator.Current
Loop
Await iterator.DisposeAsync()
The downside of the above example, it does not work with Complex JSON Objects with Selective Deserializing. Out-of-the-box, there is no support. We have to write it ourselves with a custom Stream Reader. I have created one and we will explore that next.
File Stream Example
Putting it all together, we end up with something like this:
using Common.Helpers;
using System.Text.Json;
using Contacts.System.Text.Json.Models;
internal class Program
{
#region Fields
private readonly IFilePathHelper _fileHelper =
new FilePathHelper("Resources");
private JsonSerializerOptions? _jsonSerializerOptions;
private int _count;
private readonly string _file = "Mock_Contacts1.json";
#endregion
#region Methods
private static async Task Main()
=> await new Program().Execute().ConfigureAwait(false);
private async Task Execute()
{
Console.WriteLine($"Reading {this._file}");
this._jsonSerializerOptions = new();
await using FileStream stream =
File.OpenRead(this._fileHelper.Resolve(this._file));
Console.WriteLine($"Processing: {this._file}");
await foreach (Contact? item in
JsonSerializer.DeserializeAsyncEnumerable<Contact>
(stream, this._jsonSerializerOptions))
Process(item);
Console.WriteLine($"Contacts: {this._count:N0}");
Console.WriteLine("Finished");
}
private void Process(Contact? item)
{
this._count++;
}
#endregion
}
Imports System.IO
Imports System.Text.Json
Imports System.Text.Json.Stream
Imports Common.Helpers
Imports SystemTextJsonContact.Models
Module Program
#Region "Fields"
Private ReadOnly _fileHelper As FilePathHelper =
New FilePathHelper("Resources")
Private _options As JsonSerializerOptions
Private _count As Integer
Private ReadOnly _file As String = "Mock_Contacts1.json"
#End Region
#Region "Methods"
Sub Main(args As String())
Console.WriteLine($"Reading {_file}")
MainAsync(args).GetAwaiter.GetResult()
Console.WriteLine("Finished")
End Sub
Private Async Function MainAsync(args As String()) As Task
_options = New JsonSerializerOptions()
Using stream As Stream = File.OpenRead(_fileHelper.Resolve(_file))
Console.WriteLine($"Processing: {_file}")
Dim iterator = JsonSerializer.DeserializeAsyncEnumerable(Of Contact) _
(stream, _options).GetAsyncEnumerator()
While Await iterator.MoveNextAsync()
Dim item = iterator.Current
Process(item)
End While
Await iterator.DisposeAsync()
End Using
Console.WriteLine($"Contacts: {_count:N0}")
End Function
Private Sub Process(item As Contact)
_count += 1
End Sub
#End Region
End Module
NOTE: To see the code running, see prototype \ local \ SimpleData \ SystemTextJsonContact
VB/C# project. For the Web API version, see prototype \ remote \ SimpleData \ SystemTextJsonContact
VB/C# project
Part 2: Custom Utf8JsonAsyncStreamReader
The goal was to write a Stream Reader that can work like NewtonSoft.Json.JsonTextReader
with minimal changes to code. In the following section, Writing a Custom Stream Reader I go into detail about how it was achieved.
How to Use the New Utf8JsonAsyncStreamReader
System.Text.Json
is a rewrite of Newtonsoft.Json
, so the upside is that the solution is only one line of code - both the TextReader
& JsonTextReader
are rolled into one class. This will work with any stream type. So using it is as simple as:
using Utf8JsonAsyncStreamReader jsonReader = new Utf8JsonAsyncStreamReader(stream);
Using jsonReader As Utf8JsonAsyncStreamReader = New Utf8JsonAsyncStreamReader(stream)
End Using
NOTE: The Utf8JsonAsyncStreamReader
is a drop-in replacement for both the TextReader
& JsonTextReader
, so works exactly the same as the NewtonSoft
sample code above.
Writing a Custom Stream Reader
NOTE: I won't dump all of the code here, only the parts that matter. The complete code for the Custom Stream Readers can be found in the project libaries \ System.Text.Json \ System.Text.Json.Stream
. All code is fully commented explaining how it works.
I've done a lot of research on this subject as I did not want to do the work if someone else has already found a solution.
There is one however it is synchronous: mtosh (original solution) - StackOverflow, then evil-dr-nick - Github. NOTE: I've included an updated version in the downloadable code that fixes a couple of minor issues + modernization of the code.
We need an asynchronous solution. There currently isn't one... until now!
After a few tries, I came up with the following solution using System.IO.Pipelines APIs. Why not Span<T>
or Memory<t>
like with the above synchronous solution from mtosh? Dot Net 6.0 PipeReader
class only supports ReadAtLeastAsync
, not guaranteeing an exact amount of bytes required to use ReadOnlySequenceSegment<T>
.
The benefit of using a PipeReader
to process the stream is that it manages the handling of the stream and returns a ReadOnlySequence<t> Strut. This gives us fast and raw access to the bytes.
if (this._bytesConsumed > 0) this._reader.AdvanceTo
(this._buffer.GetPosition(this._bytesConsumed));
ReadResult readResult = await this._reader
.ReadAtLeastAsync(this._bufferSize, cancellationToken)
.ConfigureAwait(false);
this._bytesConsumed = 0;
this._buffer = readResult.Buffer;
this._endOfStream = readResult.IsCompleted;
if (this._buffer.Length - this._bytesConsumed > 0 &&
!this.JsonReader(this._endOfStream))
throw new Exception("Invalid Json or incomplete token or buffer undersized");
This code lives in the method ValueTask<bool> ReadAsync
. When we are ready to deserialize the identified object, we start buffering the bytes into a MemoryStream
. We use a flag '_isBuffering
' in the ReadAsync
method to manage the buffering:
if (this._isBuffering)
{
this.WriteToBufferStream();
this._bufferingStartIndex = 0;
}
The writing of the buffer is with a PipeWriter
. With a bit of testing, manual writing is faster than using the built-in Write
due to boxing requirements. It would look something like this:
this._writer!.Write(this._buffer.Slice(this._bufferingStartIndex,
this._bytesConsumed - this._bufferingStartIndex).ToArray());
Rather than inheriting the PipeWriter
and writing my own custom Write
method, as it is only required in one place, I simply do it inline (with no boxing):
private void WriteToBufferStream()
{
int bytes = this._bytesConsumed - this._bufferingStartIndex;
this._buffer.Slice(this._bufferingStartIndex, bytes).CopyTo
(this._writer!.GetSpan(bytes));
this._writer.Advance(bytes);
}
For the DeserializeAsync
method, we need to walk the stream to find the end of the object. If we don't the JsonSerializer.DeserializeAsync
method will throw an error. Walking the stream is simply monitoring the depth of the graph until we find the end of the object or array:
while (!cancellationToken.IsCancellationRequested)
{
if (this.TokenType is JsonTokenType.StartObject or JsonTokenType.StartArray)
depth++;
else if (this.TokenType is JsonTokenType.EndObject or JsonTokenType.EndArray)
depth--;
if (depth == 0)
break;
await this.ReadAsync(cancellationToken).ConfigureAwait(false);
}
Once we have the complete object graph, we can clean up:
this.WriteToBufferStream();
await this._writer!.CompleteAsync().ConfigureAwait(false);
if (cancellationToken.IsCancellationRequested)
return false;
stream.Seek(0, SeekOrigin.Begin);
return true;
We move the stream pointer back to the start of the stream ready for deserializing with JsonSerializer.DeserializeAsync
method:
if (!await this.GetJsonObjectAsync(stream, cancellationToken).ConfigureAwait(false))
return default;
TResult? result = await JsonSerializer
.DeserializeAsync<TResult>(stream, cancellationToken: cancellationToken)
.ConfigureAwait(false);
this._isBuffering = false;
Check out the samples to see the code and how it works.
Part 3: Libraries to Simplify Working With Large JSON Object Streams
This next section is about using the libraries bundled with this article and downloadable code. The libraries are based on projects that I have worked on and the need to move to System.Text.Json
from Newtonsoft.Json
. These libraries are not required to work with streams but will help wrap the processes to reduce the repetitious code.
Key design goals:
- Swappable between
Newtonsoft.Json
& System.Text.Json
; file system
and web API
; Json Object
& zipped JSON object
; with an almost identical interface so that changing between implementations is a seamless process - Work with
File System
& Web Api
streams - Work with single or multiple raw JSON files or zipped raw Json Files of any size
- Abstracting away all implementation of opening and processing of JSON Objects - only the identification and processing code is required
- Asynchronous operation, including fast fail error handling
- Cancel during mid-streaming support -
CancellationToken
- Minimal memory footprint - Work with objects as they are deserialized, no need to keep all objects in memory
- Highly performant - as close to raw performance as possible
DI
/IOC
support - not tied to any IOC
container system ILogger
support - not specifically tied to any specific logger - Custom data buffering size configuration - the default buffer sizes used by
Newtonsoft
is 1K (1,024 bytes) & System.Text.Json
is 16K (16,384 bytes) - Testability & Benchmarking
How to Use: Newtownsoft.Json - Simple Json Collection Objects
For Simple Json Collection Objects, we implement the JsonFileStreamObjectDeserializer<TConfiguration>
base class:
public class ContactFileStreamDeserializer
: JsonFileStreamObjectDeserializer<IContactFilesConfiguration>
{
}
Public Class ContactFileStreamDeserializer
Inherits JsonFileStreamObjectDeserializer(Of IFilesConfiguration)
End Class
Then we implement the ProcessAsync
method:
protected override async Task ProcessAsync
(JsonReader jsonReader, CancellationToken cancellationToken)
{
if (this.BatchSize > 1)
await this.DeserializeAsync<Contact>
(jsonReader, this.BatchProcessAsync, cancellationToken)
.ConfigureAwait(false);
else
await this.DeserializeAsync<Contact>
(jsonReader, this.ItemProcessAsync, cancellationToken)
.ConfigureAwait(false);
}
Protected Overrides Async Function ProcessAsync
(jsonReader As JsonReader, cancellationToken As CancellationToken) As Task
If BatchSize > 1 Then
Await DeserializeAsync(Of Contact)
(jsonReader, AddressOf BatchProcessAsync, cancellationToken)
Else
Await DeserializeAsync(Of Contact)
(jsonReader, AddressOf ItemProcessAsync, cancellationToken)
End If
End Function
As you can see, both batching and single object support are built in. See the property table below for more information on how to configure it.
The above ProcessAsync
method code works for both raw files and zipped. The implementation is based on the inherited base class:
- Raw Json Files:
JsonFileStreamObjectDeserializer<TConfiguration>
- Zipped Json Files:
JsonZipFileStreamPropertyDeserializer<TZipConfiguration, TConfiguration>
The same applies to the Web API:
- Raw Json Files:
JsonHttpStreamPropertyDeserializer<TConfiguration>
- Zipped Json Files:
JsonZipHttpStreamObjectDeserializer<TZipConfiguration, TConfiguration>
To use the above implemented class:
IFilePathHelper fileHelper = new FilePathHelper(DataFolder)
IContactFilesConfiguration config = new ContactFilesConfiguration();
CancellationTokenSource cts = new();
var deserializer = new ContactFileStreamDeserializer(fileHelper, config)
{
FileId = "MOCK1",
FileAction = DeserializeActionType.Single,
FailIfFileNotFound = true,
CancellationTokenSource = cts
};
await deserializer.ProcessAsync().ConfigureAwait(false);
Dim fileHelper = New FilePathHelper(DataFolder)
Dim config = New ContactFilesConfiguration()
Dim cts As New CancellationTokenSource()
Dim deserializer = New ContactFileStreamDeserializer(fileHelper, config) With
{
.FileId = "MOCK1",
.FileAction = DeserializeActionType.Single,
.FailIfFileNotFound = True,
.CancellationTokenSource = cts
}
await deserializer.ProcessAsync().ConfigureAwait(False)
There is a configuration file that holds the location of the data. This configuration class can hold the names of all of the raw JSON files in a project with a keyed id access or just the file that needs to be worked on:
public class ContactFilesConfiguration : IContactFilesConfiguration
{
#region Constructors
public ContactFilesConfiguration()
{
this.Paths = new Dictionary<string, string>
{
["MOCK1"] = "Mock_Contacts1.json",
["MOCK2"] = "Mock_Contacts2.json",
["MOCK3"] = "Mock_Contacts3.json",
};
}
#endregion
#region Properties
public IDictionary<string, string> Paths { get; }
#endregion
}
Public Class ContactFilesConfiguration
Implements IContactFilesConfiguration
#Region "Constructors"
Public Sub New()
Paths = New Dictionary(Of String, String) From {
{"MOCK1", "Mock_Contacts1.json"},
{"MOCK2", "Mock_Contacts2.json"},
{"MOCK3", "Mock_Contacts3.json"}
}
End Sub
#End Region
#Region "Properties"
Public ReadOnly Property Paths As IDictionary(Of String, String) _
Implements Configuration.IDataConfiguration.Paths
#End Region
End Class
If you have multiple files of the same type to be processed, then you can set the FileAction
property to DeserializeActionType.Multiple
, and the base class will automatically walk the files in the configuration file.
Working with zipped raw JSON files is the same. We have a separate configuration file, like the one above:
public class ContactZipFilesConfiguration : IContactZipFilesConfiguration
{
#region Constructors
public ContactZipFilesConfiguration()
{
this.Paths = new Dictionary<string, string>
{
["MOCK_ZIP"] = "Mock_Json_Files.zip",
};
}
#endregion
#region Properties
public IDictionary<string, string> Paths { get; }
#endregion
}
Public Class ContactZipFilesConfiguration
Implements IContactZipFilesConfiguration
#Region "Constructors"
Public Sub New()
Paths = New Dictionary(Of String, String) From {
{"MOCK_ZIP", "Mock_Json_Files.zip"}
}
End Sub
#End Region
#Region "Properties"
Public ReadOnly Property Paths As IDictionary(Of String, String) _
Implements Configuration.IDataConfiguration.Paths
#End Region
End Class
NOTE: To see the code running, see applications \ local \ SimpleData \ NewtonsoftContacts
& applications \ local \ SimpleZippedData \ NewtonsoftZippedContacts
& applications \ remote \ SimpleData \ NewtonsoftContacts
& applications \ remote \ SimpleZippedData \ NewtonsoftZippedContacts
VB/C# projects for non Dependency Injection. For use with Dependency Injection, see applications \ local \ SimpleData \ NewtonsoftContactsDI
& applications \ local \ SimpleZippedData \ NewtonsoftZippedContactsDI
& applications \ remote \ SimpleData \ NewtonsoftContactsDI
& applications \ remote \ SimpleZippedData \ NewtonsoftZippedContactsDI
VB/C# projects. Common code between the non-DI & DI projects is found in the Shared
subfolder. You can also see the code in use in the UnitTest & Benchmark VB/C# projects.
How to use: Newtownsoft.Json - Complex JSON Objects
For complex Json Objects, we need to implement the JsonFileStreamPropertyDeserializer<TConfiguration>
base class:
public class EbayCategoryAspectFileStreamDeserializer
: JsonFileStreamPropertyDeserializer<IEbayCategoryAspectFilesConfiguration>
{
}
Public Class EbayCategoryAspectFileStreamDeserializer
Inherits JsonFileStreamObjectDeserializer(Of IEbayCategoryAspectFilesConfiguration)
End Class
Then we implement the ProcessAsync
method:
protected override async Task ProcessAsync
(JsonReader jsonReader, CancellationToken cancellationToken)
{
switch (jsonReader.GetString())
{
case "categoryAspects":
if (BatchSize > 1)
await DeserializeAsync<CategoryAspect>
(jsonReader, BatchProcessAsync, cancellationToken)
.ConfigureAwait(false);
else
await DeserializeAsync<CategoryAspect>
(jsonReader, ItemProcessAsync, cancellationToken)
.ConfigureAwait(false);
break;
case "categoryTreeVersion":
{
await jsonReader.ReadAsync(cancellationToken).ConfigureAwait(false);
string? version = jsonReader.GetString();
Logger?.Emit(LogLevel.Information, $"Version: {version ?? "no value"}");
break;
}
case "categoryTreeId":
{
await jsonReader.ReadAsync(cancellationToken).ConfigureAwait(false);
string? id = jsonReader.GetString();
Logger?.Emit(LogLevel.Information, $"Id: {id ?? "no value"}");
break;
}
}
}
Protected Overrides Async Function ProcessAsync
(jsonReader As JsonReader, cancellationToken As CancellationToken) As Task
Select Case jsonReader.GetString()
Case "categoryAspects"
Await jsonReader.ReadAsync().ConfigureAwait(False)
If BatchSize > 1 Then
Await DeserializeAsync(Of CategoryAspect)(jsonReader,
AddressOf BatchProcessAsync, cancellationToken).
ConfigureAwait(False)
Else
Await DeserializeAsync(Of CategoryAspect)(jsonReader,
AddressOf ItemProcessAsync, cancellationToken).
ConfigureAwait(False)
End If
Case "categoryTreeVersion"
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim version = jsonReader.GetString()
_logger.Emit(LogLevel.Information, $"Version: {If(version, "no value")}")
Case "categoryTreeId"
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim Id = jsonReader.GetString()
_logger.Emit(LogLevel.Information, $"Id: {If(Id, "no value")}")
End Select
End Function
To use the above implemented class:
IFilePathHelper fileHelper = new FilePathHelper(DataFolder)
IEbayCategoryAspectFilesConfiguration config =
new EbayCategoryAspectFilesConfiguration();
CancellationTokenSource cts = new();
var deserializer = new EbayCategoryAspectFileStreamDeserializer(fileHelper, config)
{
MarketplaceId = "EBAY_US",
FileAction = DeserializeActionType.Single,
FailIfFileNotFound = true,
CancellationTokenSource = cts
};
await deserializer.ProcessAsync().ConfigureAwait(false);
Dim fileHelper = New FilePathHelper(DataFolder)
Dim config = New ContactFilesConfiguration()
Dim cts As New CancellationTokenSource()
Dim deserializer =
New EbayCategoryAspectFileStreamDeserializer(fileHelper, config) With
{
.MarketplaceId = "EBAY_US",
.FileAction = DeserializeActionType.Single,
.FailIfFileNotFound = True,
.CancellationTokenSource = cts
}
await deserializer.ProcessAsync().ConfigureAwait(False)
NOTE: To see the code running, see applications \ local \ SimpleData \ NewtonsoftEbay
& applications \ local \ SimpleZippedData \ NewtonsoftZippedEbay
& applications \ remote \ SimpleData \ NewtonsoftEbay
& applications \ remote \ SimpleZippedData \ NewtonsoftZippedEbay
VB/C# projects for non Dependency Injection. For use with Dependency Injection, see applications \ local \ SimpleData \ NewtonsoftEbayDI
& applications \ local \ SimpleZippedData \ NewtonsoftZippedEbayDI
& applications \ remote \ SimpleData \ NewtonsoftEbayDI
& applications \ remote \ SimpleZippedData \ NewtonsoftZippedEbayDI
VB/C# projects. Common code between the non-DI & DI projects is found in the Shared
subfolder. You can also see the code in use in the UnitTest & Benchmark VB/C# projects.
How to Use: System.Text.Json - Simple Json Collection Objects
For Simple JSON Collection Objects, System.Text.Json
has a new deserializing method for enumerating the stream collection of JSON objects called DeserializeAsyncEnumerable
. So the base class implementation surfaces a Stream
object instead of a StreamReader
object. The implementation for the JsonFileStreamObjectDeserializer<TConfiguration>
base class is the same:
public class ContactFileStreamDeserializer
: JsonFileStreamObjectDeserializer<IContactFilesConfiguration>
{
}
Public Class ContactFileStreamDeserializer
Inherits JsonFileStreamObjectDeserializer(Of IFilesConfiguration)
End Class
However, there is a change to how we implement the ProcessAsync
method:
protected override async Task ProcessAsync
(Stream stream, CancellationToken cancellationToken)
{
if (this.BatchSize > 1)
await this.DeserializeAsync<Contact>
(stream, this.BatchProcessAsync, cancellationToken)
.ConfigureAwait(false);
else
await this.DeserializeAsync<Contact>
(stream, this.ItemProcessAsync, cancellationToken)
.ConfigureAwait(false);
}
Protected Overrides Async Function ProcessAsync
(stream As Stream, cancellationToken As CancellationToken) As Task
If BatchSize > 1 Then
Await DeserializeAsync(Of Contact)(stream,
AddressOf BatchProcessAsync, cancellationToken).
ConfigureAwait(False)
Else
Await DeserializeAsync(Of Contact)(stream,
AddressOf ItemProcessAsync, cancellationToken).
ConfigureAwait(False)
End If
End Function
The usage of the above implementation is then the same as for Newtonsoft.Json
:
IFilePathHelper fileHelper = new FilePathHelper(DataFolder)
IContactFilesConfiguration config = new ContactFilesConfiguration();
CancellationTokenSource cts = new();
var deserializer = new ContactFileStreamDeserializer(filePathHelper, config)
{
FileId = "MOCK1",
FileAction = DeserializeActionType.Single,
FailIfFileNotFound = true,
CancellationTokenSource = cancellationTokenSource
};
await deserializer.ProcessAsync().ConfigureAwait(false);
Dim fileHelper = New FilePathHelper(DataFolder)
Dim config = New ContactFilesConfiguration()
Dim cts As New CancellationTokenSource()
Dim deserializer = New ContactFileStreamDeserializer(fileHelper, config) With
{
.FileId = "MOCK1",
.FileAction = DeserializeActionType.Single,
.FailIfFileNotFound = True,
.CancellationTokenSource = cts
}
await deserializer.ProcessAsync().ConfigureAwait(False)
NOTE: To see the code running, see applications \ local \ SimpleData \ SystemTextJsonContacts
& applications \ local \ SimpleZippedData \ SystemTextJsonZippedContacts
& applications \ remote \ SimpleData \ SystemTextJsonContacts
& applications \ remote \ SimpleZippedData \ SystemTextJsonZippedContacts
VB/C# projects for non Dependency Injection. For use with Dependency Injection, see applications \ local \ SimpleData \ SystemTextJsonContactsDI
& applications \ local \ SimpleZippedData \ SystemTextJsonZippedContactsDI
& applications \ remote \ SimpleData \ SystemTextJsonContactsDI
& applications \ remote \ SimpleZippedData \ SystemTextJsonZippedContactsDI
VB/C# projects. Common code between the non-DI & DI projects is found in the Shared subfolder. You can also see the code in use in the UnitTest
& Benchmark
VB/C# projects.
How to Use: System.Text.Json - Complex JSON Objects
For complex JSON Objects, we use the custom Utf8JsonAsyncStreamReader
class for stream reading and processing. The implementation for the JsonFileStreamPropertyDeserializer<TConfiguration>
base class:
public class EbayCategoryAspectFileStreamDeserializer
: JsonFileStreamPropertyDeserializer<IEbayCategoryAspectFilesConfiguration>
{
}
Public Class EbayCategoryAspectFileStreamDeserializer
Inherits JsonFileStreamObjectDeserializer(Of IEbayCategoryAspectFilesConfiguration)
End Class
Then we implement the ProcessAsync
method:
protected override async Task ProcessAsync
(Utf8JsonAsyncStreamReader jsonReader, CancellationToken cancellationToken)
{
switch (jsonReader.GetString())
{
case "categoryAspects":
if (BatchSize > 1)
await DeserializeAsync<CategoryAspect>
(jsonReader, BatchProcessAsync, cancellationToken)
.ConfigureAwait(false);
else
await DeserializeAsync<CategoryAspect>
(jsonReader, ItemProcessAsync, cancellationToken)
.ConfigureAwait(false);
break;
case "categoryTreeVersion":
{
await jsonReader.ReadAsync(cancellationToken).ConfigureAwait(false);
string? version = jsonReader.GetString();
Logger?.Emit(LogLevel.Information, $"Version: {version ?? "no value"}");
break;
}
case "categoryTreeId":
{
await jsonReader.ReadAsync(cancellationToken).ConfigureAwait(false);
string? id = jsonReader.GetString();
Logger?.Emit(LogLevel.Information, $"Id: {id ?? "no value"}");
break;
}
}
}
Protected Overrides Async Function ProcessAsync
(jsonReader As Utf8JsonAsyncStreamReader,
cancellationToken As CancellationToken) As Task
Select Case jsonReader.GetString()
Case "categoryAspects"
Await jsonReader.ReadAsync().ConfigureAwait(False)
If BatchSize > 1 Then
Await DeserializeAsync(Of CategoryAspect)(jsonReader,
AddressOf BatchProcessAsync, cancellationToken).
ConfigureAwait(false)
Else
Await DeserializeAsync(Of CategoryAspect)(jsonReader,
AddressOf ItemProcessAsync, cancellationToken).
ConfigureAwait(false)
End If
Case "categoryTreeVersion"
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim version = jsonReader.GetString()
_logger.Emit(LogLevel.Information, $"Version: {If(version, "no value")}")
Case "categoryTreeId"
Await jsonReader.ReadAsync().ConfigureAwait(False)
Dim Id = jsonReader.GetString()
_logger.Emit(LogLevel.Information, $"Id: {If(Id, "no value")}")
End Select
End Function
To use the above implemented class, it is the same as NewtonSoft
:
IFilePathHelper fileHelper = new FilePathHelper(DataFolder)
IEbayCategoryAspectFilesConfiguration config =
new EbayCategoryAspectFilesConfiguration();
CancellationTokenSource cts = new();
var deserializer = new EbayCategoryAspectFileStreamDeserializer(fileHelper, config)
{
MarketplaceId = "EBAY_US",
FileAction = DeserializeActionType.Single,
FailIfFileNotFound = true,
CancellationTokenSource = cts
};
await deserializer.ProcessAsync().ConfigureAwait(false);
Dim fileHelper = New FilePathHelper(DataFolder)
Dim config = New ContactFilesConfiguration()
Dim cts As New CancellationTokenSource()
Dim deserializer = New EbayCategoryAspectFileStreamDeserializer(fileHelper, config) With
{
.MarketplaceId = "EBAY_US",
.FileAction = DeserializeActionType.Single,
.FailIfFileNotFound = True,
.CancellationTokenSource = cts
}
await deserializer.ProcessAsync().ConfigureAwait(False)
NOTE: To see the code running, see applications \ local \ SimpleData \ SystemTextJsonEbay
& applications \ local \ SimpleZippedData \ SystemTextJsonZippedEbay
& applications \ remote \ SimpleData \ SystemTextJsonEbay
& applications \ remote \ SimpleZippedData \ SystemTextJsonZippedEbay
VB/C# projects for non Dependency Injection. For use with Dependency Injection, see applications \ local \ SimpleData \ SystemTextJsonEbayDI
& applications \ local \ SimpleZippedData \ SystemTextJsonZippedEbayDI
& applications \ remote \ SimpleData \ SystemTextJsonEbayDI
& applications \ remote \ SimpleZippedData \ SystemTextJsonZippedEbayDI
VB/C# projects. Common code between the non-DI & DI projects is found in the Shared
subfolder. You can also see the code in use in the UnitTest & Benchmark VB/C# projects.
Library Implementation
The design of the base classes was to work with both Newtonsoft.Json
and System.Text.Json
, so the base class is broken into three parts:
- Common base implementation:
Common.Json
> JsonStreamDeserializer
class Newtonsoft.Json
base implementation: Common.NewtonSoft.Json
> JsonStreamPropertyDeserializer
& JsonZipStreamPropertyDeserializer
base common classes. Then there are separate base classes for File System
& Web API
implementations:
File System
- Simple:
JsonFileStreamPropertyDeserializer
, JsonZipFileStreamPropertyDeserializer
- Complex:
JsonFileStreamObjectDeserializer
, JsonZipFileStreamObjectDeserializer
Web API
- Simple:
JsonHttpStreamPropertyDeserializer
, JsonZipHttpStreamPropertyDeserializer
- Complex:
JsonHttpStreamObjectDeserializer
, JsonZipHttpStreamObjectDeserializer
System.Text.Json
base implementation: Common.System.Text.Json
> JsonStreamPropertyDeserializer
& JsonZipStreamPropertyDeserializer
& JsonStreamObjectDeserializer
& JsonZipStreamObjectDeserializer
base common classes. Then there are separate base classes for File System
& Web API
implementations:
File System
- Simple:
JsonFileStreamPropertyDeserializer
, JsonZipFileStreamPropertyDeserializer
- Complex:
JsonFileStreamObjectDeserializer
, JsonZipFileStreamObjectDeserializer
Web API
- Simple:
JsonHttpStreamPropertyDeserializer
, JsonZipHttpStreamPropertyDeserializer
- Complex:
JsonHttpStreamObjectDeserializer
, JsonZipHttpStreamObjectDeserializer
The System.Text.Json
has two additional Object
common base classes. This is due to the differences between Newtonsoft.Json
& System.Text.Json
.
If you are using the library for your own usage, the projects that you require are as follows:
Newtonsoft.Json
: Common.Json
& Common.NewtonSoft.Json
System.Text.Json
: Common.Json
& Common.System.Text.Json
(Common.SystemText.Json
for VB due to compiler name collisions)
Configuration Properties
Property | Description | Default |
FileId | Lookup key in Configuration file | not set |
ZipFileId | Lookup key in Configuration zip file | not set |
FileAction | Single or Multiple configuration file entries | Single |
ZipFileAction | Single or Multiple lookup key(s) in the configuration zip file | Single |
BatchSize | Number of objects to process at a time | 1 |
BufferSize | Number of bytes read and processed from the stream as a time | 8,192 |
FailIfFileNotFound | Fail silently or throw exception if file not found | true |
CancellationTokenSource | (optional) | default |
JsonSerializerSettings | Newtonsoft only | default |
JsonSerializerOptions | System.Text.Json only | default |
I am not going to discuss the code for these classes as there is a lot of code and this article is too long as it is. So, I recommend looking at the code.
What I will point out is how I handle the decision logic for choosing what method of processing is selected.
I am using a keyed dictionary based on the FileAction
:
Common
base:
protected string GetActionKey(DeserializeActionType fileType)
=> $"{fileType}";
protected abstract Dictionary<string, Func<Task>> ActionDelegatesFactory();
protected virtual async ValueTask ExecuteActionDelegateAsync(string key)
{
Dictionary<string, Func<Task>> ActionDelegates = this.ActionDelegatesFactory();
if (!ActionDelegates.ContainsKey(key))
{
KeyNotFoundException exception = new($"The '{this.FileAction}'
Action was not found!");
this.Logger?.Emit(LogLevel.Error, "Invalid Action!", exception);
throw exception;
}
await ActionDelegates[key]().ConfigureAwait(false);
}
Protected Function GetActionKey(fileType As DeserializeActionType) As String
Return $"{fileType}"
End Function
Protected MustOverride Function ActionDelegatesFactory() _
As Dictionary(Of String, Func(Of Task))
Protected Overridable Async Function _
ExecuteActionDelegateAsync(key As String) As Task
Dim ActionDelegates = ActionDelegatesFactory()
If Not ActionDelegates.ContainsKey(key) Then
Dim exception = New KeyNotFoundException_
($"The '{Me.FileAction}' Action was not found!")
Me._logger.Emit(LogLevel.Error, "Invalid Action!", exception)
Throw exception
End If
Await ActionDelegates(key)().ConfigureAwait(False)
End Function
Common
.< lib >
base for files/web API:
public override async ValueTask ProcessAsync()
=> await this.ExecuteActionDelegateAsync(this.GetActionKey(this.FileAction))
.ConfigureAwait(false);
protected abstract Task ProcessAsync(JsonReader jsonReader,
CancellationToken cancellationToken);
protected override Dictionary<string, Func<Task>> ActionDelegatesFactory()
=> new()
{
[this.GetActionKey(DeserializeActionType.Single)] = ()
=> this.ProcessActionAsync
(this._configuration!.Paths[this.ConfigurationFileKey!],
this.ProcessAsync,
this.CancellationTokenSource?.Token ?? default),
[this.GetActionKey(DeserializeActionType.Multiple)] = ()
=> this.ProcessActionAsync
(this._configuration!, this.ProcessAsync,
this.CancellationTokenSource?.Token ?? default),
};
Public Overrides Async Function ProcessAsync() As Task
Await Me.ExecuteActionDelegateAsync_
(Me.GetActionKey(Me.FileAction)).ConfigureAwait(False)
End Function
Protected MustOverride Overloads Function ProcessAsync_
(jsonReader As JsonReader, cancellationToken As CancellationToken) As Task
Protected Overrides Function ActionDelegatesFactory() _
As Dictionary(Of String, Func(Of Task))
Return New Dictionary(Of String, Func(Of Task)) From {
{Me.GetActionKey(DeserializeActionType.Single), _
AddressOf Me.SingleShimAsync},
{Me.GetActionKey(DeserializeActionType.Multiple), _
AddressOf Me.MultipleShimAsync}
}
End Function
#Region "ActionDelegatesFactory Shims"
Private Async Function SingleShimAsync() As Task
Await Me.ProcessActionAsync(
Me._configuration.Paths(Me.ConfigurationFileKey),
AddressOf Me.ProcessAsync,
If(Me.CancellationTokenSource Is Nothing, _
Nothing, Me.CancellationTokenSource.Token)) _
.ConfigureAwait(False)
End Function
Private Async Function MultipleShimAsync() As Task
Await Me.ProcessActionAsync(
Me._configuration,
AddressOf Me.ProcessAsync,
If(Me.CancellationTokenSource Is Nothing, _
Nothing, Me.CancellationTokenSource.Token)) _
.ConfigureAwait(False)
End Function
#End Region
Common.< lib >
base for zipped files/web API:
public override async ValueTask ProcessAsync()
=> await this.ExecuteActionDelegateAsync
(this.GetActionKey(this.ZipFileAction, this.FileAction))
.ConfigureAwait(false);
#region Processors
private string GetActionKey(DeserializeActionType zipFileType,
DeserializeActionType fileType)
=> $"{zipFileType}{this.GetActionKey(fileType)}";
protected override async ValueTask ExecuteActionDelegateAsync(string key)
{
Dictionary<string, Func<Task>> ActionDelegates = this.ActionDelegatesFactory();
if (!ActionDelegates.ContainsKey(key))
{
KeyNotFoundException exception =
new KeyNotFoundException
($"The zip '{this.ZipFileAction} ' or file '{this.FileAction}'
Action(s) not found!");
this.Logger?.Emit(LogLevel.Error, "Invalid Action!", exception);
throw exception;
}
await ActionDelegates[key]().ConfigureAwait(false);
}
protected override Dictionary<string, Func<Task>> ActionDelegatesFactory()
=> new()
{
[this.GetActionKey(DeserializeActionType.Single,
DeserializeActionType.Single)] = ()
=> this.ProcessZipActionAsync
(this._zipConfiguration.Paths[this.ConfigurationZipFileKey!],
this._configuration!.Paths[this.ConfigurationFileKey!],
this.ProcessAsync,
this.CancellationTokenSource?.Token ?? default),
[this.GetActionKey(DeserializeActionType.Multiple,
DeserializeActionType.Single)] = ()
=> this.ProcessZipActionAsync
(this._zipConfiguration,
this._configuration!.Paths[this.ConfigurationFileKey!],
this.ProcessAsync, this.CancellationTokenSource?.Token ?? default),
[this.GetActionKey(DeserializeActionType.Single,
DeserializeActionType.Multiple)] = ()
=> ProcessZipActionAsync
(this._zipConfiguration.Paths[this.ConfigurationZipFileKey!],
this._configuration!, this.ProcessAsync,
this.CancellationTokenSource?.Token ?? default),
[this.GetActionKey(DeserializeActionType.Multiple,
DeserializeActionType.Multiple)] = ()
=> this.ProcessZipActionAsync
(this._zipConfiguration, this.ProcessAsync,
this.CancellationTokenSource?.Token ?? default),
};
Public Overrides Async Function ProcessAsync() As Task
Await Me.ExecuteActionDelegateAsync(Me.GetActionKey_
(Me.ZipFileAction, Me.FileAction)).ConfigureAwait(False)
End Function
Private Shadows Function GetActionKey(zipFileType As DeserializeActionType, _
fileType As DeserializeActionType) As String
Return $"{zipFileType}{MyBase.GetActionKey(fileType)}"
End Function
Protected Overrides Async Function ExecuteActionDelegateAsync(key As String) As Task
Dim ActionDelegates = ActionDelegatesFactory()
If Not ActionDelegates.ContainsKey(key) Then
Dim exception = New KeyNotFoundException($"The zip '{Me.ZipFileAction} ' _
or file '{Me.FileAction}' Action(s) not found!")
Me._logger.Emit(LogLevel.Error, "Invalid Action!", exception)
Throw exception
End If
Await ActionDelegates(key)().ConfigureAwait(False)
End Function
Protected Overrides Function ActionDelegatesFactory() _
As Dictionary(Of String, Func(Of Task))
Return New Dictionary(Of String, Func(Of Task)) From {
{Me.GetActionKey(DeserializeActionType.Single, _
DeserializeActionType.Single),
AddressOf Me.SingleSingleShimAsync},
{Me.GetActionKey(DeserializeActionType.Single, _
DeserializeActionType.Multiple),
AddressOf Me.SingleMultipleShimAsync},
{Me.GetActionKey(DeserializeActionType.Multiple, _
DeserializeActionType.Single),
AddressOf Me.MultipleSingleShimAsync},
{Me.GetActionKey(DeserializeActionType.Multiple, _
DeserializeActionType.Multiple),
AddressOf Me.MultipleMultipleShimAsync}
}
End Function
#Region "ActionDelegatesFactory Shims"
Private Async Function SingleSingleShimAsync() As Task
Await Me.ProcessZipActionAsync(
Me._zipConfiguration.Paths(Me.ConfigurationZipFileKey),
Me._configuration.Paths(Me.ConfigurationFileKey),
AddressOf Me.ProcessAsync,
If(Me.CancellationTokenSource Is Nothing, Nothing, _
Me.CancellationTokenSource.Token))
ConfigureAwait(False)
End Function
Private Async Function MultipleSingleShimAsync() As Task
Await Me.ProcessZipActionAsync(
Me._zipConfiguration,
Me._configuration.Paths(Me.ConfigurationFileKey),
AddressOf Me.ProcessAsync,
If(Me.CancellationTokenSource Is Nothing, Nothing, _
Me.CancellationTokenSource.Token)).
ConfigureAwait(False)
End Function
Private Async Function SingleMultipleShimAsync() As Task
Await Me.ProcessZipActionAsync(
Me._zipConfiguration.Paths(Me.ConfigurationZipFileKey),
Me._configuration,
AddressOf Me.ProcessAsync,
If(Me.CancellationTokenSource Is Nothing, Nothing, _
Me.CancellationTokenSource.Token)).
ConfigureAwait(False)
End Function
Private Async Function MultipleMultipleShimAsync() As Task
Await Me.ProcessZipActionAsync(
Me._zipConfiguration,
AddressOf Me.ProcessAsync,
If(Me.CancellationTokenSource Is Nothing, Nothing, _
Me.CancellationTokenSource.Token)).
ConfigureAwait(False)
End Function
#End Region
NOTE: VB & C# handle delegates differently. For VB, Shim
methods were required to allow the passing of parameters to method calls.
Part 4: Unit Tests
Unit Tests are implemented for File System
file and zipped file implementations only cover both Simple JSON Collection Object (Contacts
) and Complex JSON Objects (Ebay CategoryAspect
). The unit tests cover:
- Standard call to completion
- Standard call with simulated Cancellation
- Invalid configuration - Key not found & File not found
- Single & Multiple file handling
There are also unit tests for:
- custom
Utf8JsonAsyncStreamReader
class FilePathHelper
class and also with FileConfiguration
All testing is done using dependency Injection.
The following extension was designed to implement the Simulated Cancellation
:
public static class TaskExtensions
{
public static Task TaskAwaiter(this ValueTask valueTask,
CancellationTokenSource? cancellationTokenSource = default, int delay = 2000)
{
cancellationTokenSource?.CancelAfter(delay);
Task task = valueTask.AsTask();
while (!task.GetAwaiter().IsCompleted &&
cancellationTokenSource?.IsCancellationRequested != true)
{
}
return task;
}
}
Public Module TaskExtensions
<Extension>
Public Function TaskAwaiter(task As Task, _
Optional cancellationTokenSource As CancellationTokenSource = Nothing, _
Optional delay As Integer = 2000) As Task
cancellationTokenSource?.CancelAfter(delay)
While Not task.GetAwaiter().IsCompleted AndAlso
((cancellationTokenSource Is Nothing) _
OrElse cancellationTokenSource.IsCancellationRequested <> True)
End While
Return task
End Function
End Module
If delay = 0
is passed, or no cancellation token, then the cancellation will not execute.
Here is how it is used:
private const int SimulatedDelay = 5;
[Fact]
void Live_File_Single_Cancellation()
{
CancellationTokenSource cts = new();
Task task = this.Execute(this._liveDeserializer, TestFileKey,
DeserializeActionType.Single, false, cts);
if (cts.Token.IsCancellationRequested)
this._logger.Emit(LogLevel.Warning, "Cancellation was requested");
task.IsCompleted.Should().BeFalse();
cts.IsCancellationRequested.Should().BeTrue();
this._liveDeserializer.CancellationTokenSource!
.IsCancellationRequested.Should().BeTrue();
this._liveDeserializer.CancellationTokenSource!.Should().Be(cts);
}
private Task Execute
(
IContactFileStreamDeserializer deserializer,
string fileKey,
DeserializeActionType fileAction,
bool failIfFileNotFound,
CancellationTokenSource? cts = default,
int delay = SimulatedDelay
)
{
deserializer.FileId = fileKey;
deserializer.FileAction = fileAction;
deserializer.FailIfFileNotFound = failIfFileNotFound;
deserializer.CancellationTokenSource = cts;
return deserializer.ProcessAsync().TaskAwaiter(cts, delay);
}
Private Const SimulatedDelay As Integer = 5
<Fact>
Sub Live_File_Single_Cancellation()
Dim cts = New CancellationTokenSource()
Dim task As Task = Me.Execute(Me._liveDeserializer, _
TestFileKey, DeserializeActionType.Single, False, cts)
If cts.Token.IsCancellationRequested Then
Me._logger.Emit(LogLevel.Warning, "Cancellation was requested")
End If
task.IsCompleted.Should().BeFalse()
cts.IsCancellationRequested.Should().BeTrue()
Me._liveDeserializer.CancellationTokenSource._
IsCancellationRequested.Should().BeTrue()
Me._liveDeserializer.CancellationTokenSource.Should().Be(cts)
End Sub
Function Execute _
(
deserializer As IContactFileStreamDeserializer,
fileKey As String,
fileAction As DeserializeActionType,
failIfFileNotFound As Boolean,
Optional cts As CancellationTokenSource = Nothing,
Optional delay As Integer = SimulatedDelay
) As Task
deserializer.FileId = fileKey
deserializer.FileAction = fileAction
deserializer.FailIfFileNotFound = failIfFileNotFound
deserializer.CancellationTokenSource = cts
Return deserializer.ProcessAsync().TaskAwaiter(cts, delay)
End Function
Part 5: Benchmarking
Benchmarking is implemented for both C# and VB for File System only. Covers both Simple JSON Collection Object (Contacts
) and Complex JSON Objects (Ebay CategoryAspect
) usage. Both default file/stream methods and the custom library stream methods are benchmarked:
<Contact/Ebay>_<NewtonSoft/SystemText>_Default
: loading the whole file into a string
and deserializing <Contact/Ebay>_<NewtonSoft/SystemText>_DefaultStream
: loading the whole file into a stream and deserializing Contact_SystemText_DefaultEnumerableStream
: This is a unique test using DeserializeAsyncEnumerable
for streaming and deserializing a JSON object at a time <Contact/Ebay>_<NewtonSoft/SystemText>_Streaming
: Library streaming and deserializing a JSON object at a time <Contact/Ebay>_<NewtonSoft/SystemText>_StreamingBatch10
: Library streaming and deserializing batches of 10 JSON objects at a time <Contact/Ebay>_<NewtonSoft/SystemText>_StreamingChunk64K
: Library streaming and deserializing a JSON object at a time with a 64KB buffer <Contact/Ebay>_<NewtonSoft/SystemText>_StreamingBatch10BufferSize64K
: Library streaming and deserializing batches of 10 JSON objects at a time with a 64KB buffer
Test Data
Contacts: 500,000 records / 297,675KB
Ebay Category Aspects: 750 records / 68,118KB
Test Machine Configuration
BenchmarkDotNet=v0.13.2, OS=Windows 11 (10.0.22621.675)
AMD Ryzen 7 3700X, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100-rc.2.22477.23
[Host] : .NET 6.0.10 (6.0.1022.47605), X64 RyuJIT AVX2
DefaultJob : .NET 6.0.10 (6.0.1022.47605), X64 RyuJIT AVX2
M.2 SSD
C# Benchmarking Results
| Method | Mean | Error | StdDev | Ratio | Rank |
|------------------------------------------------- |--------:|---------:|---------:|------:|-----:|
| Contact_SystemText_StreamingBatch10BufferSize64K | 2.342 s | 0.0059 s | 0.0055 s | 0.51 | 1 |
| Contact_SystemText_StreamingChunk64K | 2.415 s | 0.0183 s | 0.0171 s | 0.53 | 2 |
| Contact_SystemText_Streaming | 2.472 s | 0.0176 s | 0.0156 s | 0.54 | 3 |
| Contact_SystemText_DefaultEnumerableStream | 2.480 s | 0.0056 s | 0.0052 s | 0.54 | 3 |
| Contact_SystemText_StreamingBatch10 | 2.536 s | 0.0049 s | 0.0044 s | 0.56 | 4 |
| Contact_SystemText_Default | 4.002 s | 0.0483 s | 0.0452 s | 0.88 | 5 |
| Contact_NewtonSoft_StreamingChunk64K | 4.451 s | 0.0165 s | 0.0129 s | 0.98 | 6 |
| Contact_NewtonSoft_StreamingBatch10BufferSize64K | 4.484 s | 0.0130 s | 0.0122 s | 0.98 | 6 |
| Contact_NewtonSoft_Streaming | 4.556 s | 0.0132 s | 0.0117 s | 1.00 | 7 |
| Contact_NewtonSoft_StreamingBatch10 | 4.636 s | 0.0908 s | 0.0892 s | 1.02 | 7 |
| Contact_NewtonSoft_DefaultStream | 4.729 s | 0.0194 s | 0.0181 s | 1.04 | 8 |
| Contact_NewtonSoft_Default | 6.268 s | 0.0385 s | 0.0341 s | 1.38 | 9 |
| Method | Mean | Error | StdDev | Ratio | Rank |
|---------------------------------------------- |-----------:|---------:|---------:|------:|-----:|
| Ebay_SystemText_DefaultStream | 729.2 ms | 4.71 ms | 4.18 ms | 0.63 | 1 |
| Ebay_SystemText_Default | 970.3 ms | 8.58 ms | 8.02 ms | 0.83 | 2 |
| Ebay_NewtonSoft_StreamingChunk64K | 1,091.8 ms | 4.40 ms | 3.44 ms | 0.94 | 3 |
| Ebay_NewtonSoft_StreamingBatch10BufferSize64K | 1,094.4 ms | 7.32 ms | 6.11 ms | 0.94 | 3 |
| Ebay_NewtonSoft_StreamingBatch10 | 1,122.8 ms | 8.79 ms | 7.79 ms | 0.96 | 4 |
| Ebay_NewtonSoft_Streaming | 1,164.6 ms | 9.93 ms | 9.29 ms | 1.00 | 5 |
| Ebay_NewtonSoft_DefaultStream | 1,248.4 ms | 16.03 ms | 14.99 ms | 1.07 | 6 |
| Ebay_SystemText_StreamingChunk64K | 1,453.1 ms | 4.81 ms | 4.50 ms | 1.25 | 7 |
| Ebay_SystemText_Streaming | 1,534.9 ms | 5.19 ms | 4.86 ms | 1.32 | 8 |
| Ebay_NewtonSoft_Default | 1,536.8 ms | 18.24 ms | 17.06 ms | 1.32 | 8 |
| Ebay_SystemText_StreamingBatch10BufferSize64K | 1,562.3 ms | 5.77 ms | 5.40 ms | 1.34 | 9 |
| Ebay_SystemText_StreamingBatch10 | 1,642.4 ms | 5.60 ms | 5.24 ms | 1.41 | 10 |
VB Benchmarking Results
| Method | Mean | Error | StdDev | Ratio | Rank |
|------------------------------------------------- |--------:|---------:|---------:|------:|-----:|
| Contact_SystemText_StreamingChunk64K | 2.379 s | 0.0103 s | 0.0097 s | 0.51 | 1 |
| Contact_SystemText_StreamingBatch10BufferSize64K | 2.382 s | 0.0079 s | 0.0070 s | 0.51 | 1 |
| Contact_SystemText_DefaultEnumerableStream | 2.501 s | 0.0065 s | 0.0061 s | 0.54 | 2 |
| Contact_SystemText_Streaming | 2.657 s | 0.0060 s | 0.0057 s | 0.57 | 3 |
| Contact_SystemText_StreamingBatch10 | 2.687 s | 0.0122 s | 0.0114 s | 0.58 | 3 |
| Contact_SystemText_Default | 4.120 s | 0.0422 s | 0.0395 s | 0.88 | 4 |
| Contact_NewtonSoft_StreamingBatch10BufferSize64K | 4.509 s | 0.0251 s | 0.0235 s | 0.97 | 5 |
| Contact_NewtonSoft_StreamingBatch10 | 4.588 s | 0.0321 s | 0.0300 s | 0.99 | 6 |
| Contact_NewtonSoft_StreamingChunk64K | 4.613 s | 0.0309 s | 0.0289 s | 0.99 | 6 |
| Contact_NewtonSoft_Streaming | 4.655 s | 0.0171 s | 0.0160 s | 1.00 | 6 |
| Contact_NewtonSoft_DefaultStream | 5.492 s | 0.0571 s | 0.0534 s | 1.18 | 7 |
| Contact_NewtonSoft_Default | 6.318 s | 0.0654 s | 0.0612 s | 1.36 | 8 |
| Method | Mean | Error | StdDev | Ratio | Rank |
|---------------------------------------------- |-----------:|---------:|---------:|------:|-----:|
| Ebay_SystemText_DefaultStream | 732.3 ms | 6.43 ms | 5.70 ms | 0.67 | 1 |
| Ebay_SystemText_Default | 957.1 ms | 4.78 ms | 4.23 ms | 0.88 | 2 |
| Ebay_NewtonSoft_StreamingBatch10BufferSize64K | 1,064.6 ms | 11.54 ms | 10.80 ms | 0.97 | 3 |
| Ebay_NewtonSoft_StreamingChunk64K | 1,069.2 ms | 6.06 ms | 5.67 ms | 0.98 | 3 |
| Ebay_NewtonSoft_Streaming | 1,092.3 ms | 5.87 ms | 5.50 ms | 1.00 | 4 |
| Ebay_NewtonSoft_StreamingBatch10 | 1,096.1 ms | 3.42 ms | 3.03 ms | 1.00 | 4 |
| Ebay_NewtonSoft_DefaultStream | 1,220.9 ms | 9.47 ms | 8.86 ms | 1.12 | 5 |
| Ebay_SystemText_StreamingChunk64K | 1,489.3 ms | 4.36 ms | 3.86 ms | 1.36 | 6 |
| Ebay_NewtonSoft_Default | 1,499.3 ms | 13.32 ms | 12.46 ms | 1.37 | 6 |
| Ebay_SystemText_StreamingBatch10BufferSize64K | 1,514.3 ms | 3.99 ms | 3.33 ms | 1.39 | 6 |
| Ebay_SystemText_Streaming | 1,579.1 ms | 4.95 ms | 4.39 ms | 1.45 | 7 |
| Ebay_SystemText_StreamingBatch10 | 1,598.5 ms | 3.85 ms | 3.60 ms | 1.46 | 8 |
Comments
- The performance difference between VB & C# is within the margin of error, so essentially performance is 99% the same.
Ebay_SystemText_Streaming...
is slightly slower than Ebay_NewtonSoft_Streaming
due to the need to read ahead to find the end of the object before deserializing however still faster than Newtonsoft
loading the whole file into a string
and deserializing Simple JSON Collection Object
(Contacts) via the library (with a 64KB buffer size) is faster than DeserializeAsyncEnumerable
with a default buffer size (16KB) - For
Complex JSON Objects
, while the performance of the custom Utf8JsonAsyncStreamReader
for System.Text.Json
is acceptable, it is slower than loading or streaming the whole file into memory with System.Text.Json
. However, where it matters, the key benefit is that the custom Utf8JsonAsyncStreamReader
has a minimal memory footprint versus the extravagant memory requirements of System.Text.Json
.
Summary
Working with large data streams is not as hard as it looks. Samples implementations are provided, for both C# and VB, for working with raw JSON data and zipped JSON data via the file system
and web API
.
Both file system and web API streams for Newtonsoft.Json
and System.Text.Json
APIs are similar however the performance boost of the System.Text.Json
APIs are definitely worth migrating to DotNetCore 6.0+.
Whilst the System.Text.Json
lacks the Newtonsoft.Json.JsonTextReader
for walking the stream object at a time, it is possible to implement our own performant custom Utf8JsonAsyncStreamReader
.
History
- v1.0 - 1st November, 2022 - Initial release (DotNet 6.0)
- v1.01 - 17th November, 2022 - Updated & added downloads for Dot Net 7.0
- v1.02 - 18th November, 2022 - Updated Configuration Properties list from text code block to a table