This article explores how to improve efficiency and reduce response times in applications by deploying asynchronous streams.
Asynchronous Streams
Asynchronous streams were introduced in C#8. They provide the ability to asynchronously wait, within an enumeration, for each single item from a data stream to become available. This is particularly advantageous when consuming the sort of sporadic data streams that emanate from web sockets. The return type from an asynchronous stream is of type IAsyncEnumerable<T>
. The enumerable has an async
enumerator that asynchronously waits for the next item to become available. Here is an example of an async
enumerable:
public async IAsyncEnumerable<string> GetLines(int count,
[EnumeratorCancellation]CancellationToken token = default)
{
bool isContinue = true;
int itemCount = 0;
while (isContinue)
{
string jsonResponse = await GetWebpageAsync(token);
string[] items = JsonConvert.DeserializeObject<string[]>(jsonResponse);
foreach (var item in items)
{
yield return item;
itemCount++;
if (itemCount == count)
{
isContinue = false;
break;
}
}
}
}
The enumerable uses ValueTasks
to reduce memory allocation and improve efficiency. A ValueTask
is a structure and can be stored on the heap. It can also be reused. In the example, the asynchronous await uses a ValueTask
that references an instance of Task<string>
. But the synchronous foreach
loop employs ValueTasks
that do not need to include a reference to a Task
as the data item is already available. There’s an excellent post here about ValueTasks
. Some caution is needed when handling ValueTasks
as they are not a direct replacement for Tasks
. The safest option is to simply extract their payload by awaiting them once and to leave it at that. The method is used like this:
await foreach (string line in GetLines(42))
{
Console.WriteLine(line);
}
Cancellation
Cancellation can be implemented by passing a CancellationToken
into the method directly. But it is also possible to cancel the enumeration without having to reference the parent method.
IAsyncEnumerable<string> lines= GetLines (42);
await foreach(var line in lines.WithCancellation(cancellationToken))
{
..
}
The GetLines
method has to be decorated with the [EnumeratorCancellation]
attribute in order to enable this functionality. Cancellation is not handled by default. The method needs to monitor the status of the token and take some action, if needed. An easy way to handle this is to call cancellationToken.ThrowIfCancellationRequested();
Using Linq
The Linq extension methods for IAsyncEnumerables
are available in the System.Linq.Async
NuGet package. All the familiar extension methods are present. Queries that return an enumerable return an IAsyncEnumerable
and methods that return a single value return a ValueTask<T>
and have the suffix Async
, AnyAsync
and CountAsync
are a couple of examples. It is possible to pass an asynchronous lambda expression to some methods so that they can be awaited, these extensions have Await
appended to them like WhereAwait
. All these methods have been added to the System.Linq
namespace.
IAsyncEnumerable<string> lines= GetLines (42);
IAsyncEnumerable<string> query = lines.Where(l => l.Substring(1, 1) == "1");
var verySlowQuery = lines.WhereAwait(async l=> { await Task.Delay(500); return true; }) ;
int count =await lines.CountAsync();
Consuming Asynchronous Streams
One of the main reasons for using asynchronous streams is to prevent the User Interface (UI) message loop from blocking. If the loop blocks, controls will not be updated and the interface will become unresponsive. So it makes sense to process each item produced by the enumerable asynchronously. In the examples that follow, the item is a line of text that needs to be processed before it can be displayed in the UI. There are two parts to this, the first is to process the line and the second is to display the result. The first part can be run on its own thread but the display has to be updated on the UI thread. The processing work is simulated by calling the following nonsensical method.
private string WorkersFunction (string line)
{
string vowels = "aeiou";
Thread.Sleep(Constants.WorkerThreadSleep);
return string.Concat(line.Where(c => !vowels.Contains(c)));
}
An Example that Uses Worker Threads
The idea here is to use several worker threads to process the lines. The workers are started by calling Task.Factory.StartNew
to run the WorkersFunction
. StartNew
has a ContinuationTask
attached to it that will update the UI after the WorkersFunction
has completed. As there will be several worker threads active at the same time, it’s necessary to have an exclusive task scheduler that allows only one task at a time to write to the UI. This is achieved by using the ExclusiveScheduler
member of a ConcurrentExclusiveSchedulerPair
.
private readonly TaskScheduler uiTaskScheduler;
public AsyncStreamTest()
{
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(
TaskScheduler.FromCurrentSynchronizationContext());
uiTaskScheduler = taskSchedulerPair.ExclusiveScheduler;
}
Another requirement is to control the number of active worker threads to reduce the strain on the threadpool, memory and processor. This is achieved by using a SemaphoreSlim
. A SemaphoreSlim
keeps a record of the number of successful calls to enter the semaphore less the number of calls to leave the semaphore. If the difference is equal to the number set in the semaphore’s constructor, no further call to enter is accepted until a call to leave is received. Calls to leave are made by calling SemahoreSlim.Release()
. A call to enter is only successful when the task returned from SemaphoreSlim.WaitAsync()
completes. It’s important to make sure that a call to release is always made when a worker thread completes and the best way to ensure this is to call SemaphoreSlim.Release()
within a finally
block.
public async Task UsingWorkerThreadsAsync(int workersCount, CancellationToken token = default)
{
var lines = GetLines(Constants.LinesRequired);
var semaphoreSlim = new SemaphoreSlim(workersCount);
List<Task> tasks = new();
try
{
await foreach (var line in lines.WithCancellation(token))
{
await semaphoreSlim.WaitAsync(token);
var task = Task.Factory.StartNew(() => WorkersFunction(line, semaphoreSlim),
token,
TaskCreationOptions.None,
TaskScheduler.Default).ContinueWith(
(t) => RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(t.Result)),
token, TaskContinuationOptions.AttachedToParent, uiTaskScheduler);
tasks.Add(task);
}
}
catch (OperationCanceledException)
{ }
await Task.WhenAll(tasks);
}
The functionality required here is for the parent task to complete only after the continuation task has completed and this is achieved by setting the ContinuationOptions.AttachedToParent
request flag. Without setting that, the continuation task will be unobserved and there would be no way of knowing if it ever completed and what state it completed in. The reason that Task.Factory.StartNew()
is used in preference to Task.Run()
is that Task.Run
has its DenyChildAttach
flag set and that results in the ContinuationOptions.AttachedToParent
request being denied.
Here’s a screen shot from the sample application showing the output from this method:
The test lines have their index number attached and it’s clear that they are not completing in the same order that they started in. To ensure that the original order is maintained, it’s necessary to call a method that implements some sort of first in first out queue.
An Example Using a Channel
A System.Threading.Channel.Channel
is an effective way of allowing two different threads to exchange data. Stephen Toub has written an excellent introduction to the class that is well worth reading. The Channel
class is, essentially, a managed first in first out queue. It’s intended to be used asynchronously as all the APIs are asynchronous and that makes it an excellent replacement for the popular BlockingCollection<T>
. In this example, there is a single Channel.Writer
that takes the tasks created by repeated calls to Task.Run
and writes them to the buffer. There is also a single Channel.Reader
instance that reads the Tasks
from the buffer, awaits for them to complete and then updates the UI thread.
public async Task ChannelExample(int workersCount, CancellationToken token = default)
{
var lines = GetLines(Constants.LinesRequired);
var semaphoreSlim = new SemaphoreSlim(workersCount);
var channel = Channel.CreateBounded<Task<string>>(
new BoundedChannelOptions(Constants.BufferSize){ SingleWriter = true });
var readerTask = ReadFromSingleChannelAsync(channel, token);
try
{
await foreach (var line in lines.WithCancellation(token))
{
await semaphoreSlim.WaitAsync(CancellationToken.None);
var workerTask = Task.Run(() =>; WorkersFunction(line, semaphoreSlim));
await channel.Writer.WriteAsync(workerTask, token);
}
}
catch (OperationCanceledException) { }
channel.Writer.Complete();
await readerTask;
}
As in the previous example, the method uses a SemaphoreSlim
to throttle the number of worker tasks. It creates an instance of Channel
and sets the size of its data buffer. Then the following reader task is started but it is not awaited at this stage.
private async Task ReadFromChannelAsync(Channel<Task<string>> channel,
CancellationToken token = default)
{
while (await channel.Reader.WaitToReadAsync(token))
{
var readTask = await channel.Reader.ReadAsync(token);
var result = await readTask;
RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(result));
}
}
The channel.Reader.WaitToReadAsync
method returns true
when data is available to be read and it returns false when the channel has been closed by the channel writer. As the method is asynchronous, it can be called from the UI thread and, as there is only one reader, it can update the UI thread without the need for synchronization. When the enumeration ends, Channel.Writer.Complete
is called which, in turn, causes the readerTask
to complete.
An Example Using DataFlow Blocks.
DataFlow
blocks are classes found in the TPL DataFlow library, they are linked together to form data processing pipe lines. There are many different types but in this example, only two blocks are used, a TransformBlock
that operates in a similar way to a Channel, as described previously, and an ActionBlock
that simply writes the output from the TransformBlock
to the UI. There is very little user code that needs to be written it’s just a case of setting the various options and ‘wiring’ the two blocks together. The method has been annotated as practically every line requires some explanation, which is probably why the DataFlow
library is not as popular as it deserves to be.
public async Task UsingDataFlowAsync(int workersCount, CancellationToken token = default)
{
var lines = GetLines(Constants.LinesRequired);
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = workersCount,
SingleProducerConstrained = true,
BoundedCapacity = Constants.BufferCapacity,
CancellationToken = token
};
var transformBlock = new TransformBlock<string, string>(
(message) => WorkersFunction(message), options);
var uiUpdaterBlock = new ActionBlock<string>(msg => RaiseItemsUpdatedEvent(
new ItemsUpdatedEventArgs(msg)),
new ExecutionDataflowBlockOptions {
TaskScheduler = uiTaskScheduler, CancellationToken = token });
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
transformBlock.LinkTo(uiUpdaterBlock, linkOptions);
await foreach (var line in lines.WithCancellation(token))
{
_ = await transformBlock.SendAsync(line);
}
transformBlock.Complete();
await uiUpdaterBlock.Completion;
}
DataFlow
pipelines have a structured and robust way of closing down or cancelling. All input is refused and the data buffers are then flushed in sequence along the pipeline. The TransformBlock
partitions the data into groups of a size equal to the number of worker threads. When a partition has completed, it is outputted as a batch of data. This technique is efficient but it results in a delay before the first item appears in the UI.
Benchmark Results for the Three Methods
The following tests were run using the superlative BenchmarkDotNet NuGet package. It runs tests in isolation on a console application with no message loop. So the calls from the test methods to update the UI are, by necessity, illusionary. It’s necessary to capture the default synchronization context rather than the current context when running the tests. If the current context is captured, the test times will be infinite. The mean times shown are just for comparison purposes between the three tests. They are not indicative of the time taken for a single call to a method. They are only achieved by running approximately 10 warm up iterations followed by a further 100 timed iterations and by taking the average of the timed runs after removing any outliers.The tests were run with 10,000 lines of text, a page size of 3000 and a maximum of 35 active worker threads.
Method | Mean | StdDev | Rank | Gen 0 | Gen 1 | Allocated |
DataFlow | 4.510 s | 0.0015 s | 1 | 1000.0000 | | 5MB |
WorkerTasks | 4.645 s | 0.0326 s | 2 | 2000.0000 | 1000.0000 | 8MB |
Channel | 4.666 s | 0.0251 s | 2 | 1000.0000 | | 6MB |
In the results table above, the figures in the Gen 0 and Gen 1 columns relate to garbage collector activity. They are the number of collections per 1000 calls to the method under test. The DataFow
and Channel
tests give rise to one lightweight Generation 0 collection. But the WorkerTasks
test causes two Generation 0 collections to run and, in addition, it triggers a call to the more intensive Generation 1 collection. The method has an unordered output and the largest footprint. It seems to suffer from the need to maintain and rummage through a list of 10,000 tasks to check if any of them are still ‘hot’. The test that uses DataFlow
blocks performs well and has a relatively small memory footprint. But it has the longest delay before the first data item is sent to the UI. The Channel
method performs and scales well. It is probably the best of choice in terms of its efficiency and ability to quickly update the UI.
Conclusion
I hope that the observations and examples presented here are of assistance in understanding some of the many asynchronous data management patterns that have recently been added to the C# language and the .NET frameworks.
History
- 10th December, 2021: Initial version