Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Getting Started with Asynchronous Streams

5.00/5 (5 votes)
10 Dec 2021CPOL9 min read 8.1K   117  
How asynchronous streams can improve efficiency and reduce response times in applications
This article explores how to improve efficiency and reduce response times in applications by deploying asynchronous streams.

Asynchronous Streams

Asynchronous streams were introduced in C#8. They provide the ability to asynchronously wait, within an enumeration, for each single item from a data stream to become available. This is particularly advantageous when consuming the sort of sporadic data streams that emanate from web sockets. The return type from an asynchronous stream is of type IAsyncEnumerable<T>. The enumerable has an async enumerator that asynchronously waits for the next item to become available. Here is an example of an async enumerable:

C#
public async IAsyncEnumerable<string> GetLines(int count,
                [EnumeratorCancellation]CancellationToken token = default)
  {
      bool isContinue = true;
      int itemCount = 0;
      while (isContinue)
      {
          string jsonResponse = await GetWebpageAsync(token);
          string[] items = JsonConvert.DeserializeObject<string[]>(jsonResponse);
          foreach (var item in items)
          {
              yield return item;
              itemCount++;
              if (itemCount == count)
              {
                  isContinue = false;
                  break;
              }
          }
      }
  }

The enumerable uses ValueTasks to reduce memory allocation and improve efficiency. A ValueTask is a structure and can be stored on the heap. It can also be reused. In the example, the asynchronous await uses a ValueTask that references an instance of Task<string>. But the synchronous foreach loop employs ValueTasks that do not need to include a reference to a Task as the data item is already available. There’s an excellent post here about ValueTasks. Some caution is needed when handling ValueTasks as they are not a direct replacement for Tasks. The safest option is to simply extract their payload by awaiting them once and to leave it at that. The method is used like this:

C#
await foreach (string line in GetLines(42))
          {
              Console.WriteLine(line);
          }

Cancellation

Cancellation can be implemented by passing a CancellationToken into the method directly. But it is also possible to cancel the enumeration without having to reference the parent method.

C#
IAsyncEnumerable<string> lines= GetLines (42);
await foreach(var line in lines.WithCancellation(cancellationToken))
{
..
}

The GetLines method has to be decorated with the [EnumeratorCancellation] attribute in order to enable this functionality. Cancellation is not handled by default. The method needs to monitor the status of the token and take some action, if needed. An easy way to handle this is to call cancellationToken.ThrowIfCancellationRequested();

Using Linq

The Linq extension methods for IAsyncEnumerables are available in the System.Linq.Async NuGet package. All the familiar extension methods are present. Queries that return an enumerable return an IAsyncEnumerable and methods that return a single value return a ValueTask<T> and have the suffix Async, AnyAsync and CountAsync are a couple of examples. It is possible to pass an asynchronous lambda expression to some methods so that they can be awaited, these extensions have Await appended to them like WhereAwait. All these methods have been added to the System.Linq namespace.

C#
IAsyncEnumerable<string> lines= GetLines (42);

IAsyncEnumerable<string> query = lines.Where(l => l.Substring(1, 1) == "1");
var verySlowQuery = lines.WhereAwait(async l=> { await Task.Delay(500); return true; }) ;
int count =await lines.CountAsync();//CountAsync returns ValueTask<int>

Consuming Asynchronous Streams

One of the main reasons for using asynchronous streams is to prevent the User Interface (UI) message loop from blocking. If the loop blocks, controls will not be updated and the interface will become unresponsive. So it makes sense to process each item produced by the enumerable asynchronously. In the examples that follow, the item is a line of text that needs to be processed before it can be displayed in the UI. There are two parts to this, the first is to process the line and the second is to display the result. The first part can be run on its own thread but the display has to be updated on the UI thread. The processing work is simulated by calling the following nonsensical method.

C#
private string WorkersFunction (string line)
{
    string vowels = "aeiou";
    Thread.Sleep(Constants.WorkerThreadSleep);//simulate a busy thread
    //remove all the vowels to show something has been done
    return string.Concat(line.Where(c => !vowels.Contains(c)));
}

An Example that Uses Worker Threads

The idea here is to use several worker threads to process the lines. The workers are started by calling Task.Factory.StartNew to run the WorkersFunction. StartNew has a ContinuationTask attached to it that will update the UI after the WorkersFunction has completed. As there will be several worker threads active at the same time, it’s necessary to have an exclusive task scheduler that allows only one task at a time to write to the UI. This is achieved by using the ExclusiveScheduler member of a ConcurrentExclusiveSchedulerPair.

C#
private readonly TaskScheduler uiTaskScheduler;
public AsyncStreamTest()
{
   //call the constructor from the UI thread
    var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(
    TaskScheduler.FromCurrentSynchronizationContext());
    uiTaskScheduler = taskSchedulerPair.ExclusiveScheduler;
}

Another requirement is to control the number of active worker threads to reduce the strain on the threadpool, memory and processor. This is achieved by using a SemaphoreSlim. A SemaphoreSlim keeps a record of the number of successful calls to enter the semaphore less the number of calls to leave the semaphore. If the difference is equal to the number set in the semaphore’s constructor, no further call to enter is accepted until a call to leave is received. Calls to leave are made by calling SemahoreSlim.Release(). A call to enter is only successful when the task returned from SemaphoreSlim.WaitAsync() completes. It’s important to make sure that a call to release is always made when a worker thread completes and the best way to ensure this is to call SemaphoreSlim.Release() within a finally block.

C#
public async Task UsingWorkerThreadsAsync(int workersCount, CancellationToken token = default)
{
    var lines = GetLines(Constants.LinesRequired);
    var semaphoreSlim = new SemaphoreSlim(workersCount);
    List<Task> tasks = new();
    try
    {
      await foreach (var line in lines.WithCancellation(token))
      {
       await semaphoreSlim.WaitAsync(token);
       var task = Task.Factory.StartNew(() => WorkersFunction(line, semaphoreSlim),
       token,
       TaskCreationOptions.None,
       TaskScheduler.Default).ContinueWith(
       (t) => RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(t.Result)),
       token, TaskContinuationOptions.AttachedToParent, uiTaskScheduler);
       tasks.Add(task);
      }
    }
    catch (OperationCanceledException)
    { }
    await Task.WhenAll(tasks);
}

The functionality required here is for the parent task to complete only after the continuation task has completed and this is achieved by setting the ContinuationOptions.AttachedToParent request flag. Without setting that, the continuation task will be unobserved and there would be no way of knowing if it ever completed and what state it completed in. The reason that Task.Factory.StartNew() is used in preference to Task.Run() is that Task.Run has its DenyChildAttach flag set and that results in the ContinuationOptions.AttachedToParent request being denied.

Here’s a screen shot from the sample application showing the output from this method:

Image 1

The test lines have their index number attached and it’s clear that they are not completing in the same order that they started in. To ensure that the original order is maintained, it’s necessary to call a method that implements some sort of first in first out queue.

An Example Using a Channel

A System.Threading.Channel.Channel is an effective way of allowing two different threads to exchange data. Stephen Toub has written an excellent introduction to the class that is well worth reading. The Channel class is, essentially, a managed first in first out queue. It’s intended to be used asynchronously as all the APIs are asynchronous and that makes it an excellent replacement for the popular BlockingCollection<T>. In this example, there is a single Channel.Writer that takes the tasks created by repeated calls to Task.Run and writes them to the buffer. There is also a single Channel.Reader instance that reads the Tasks from the buffer, awaits for them to complete and then updates the UI thread.

C#
public async Task ChannelExample(int workersCount, CancellationToken token = default)
       {
           var lines = GetLines(Constants.LinesRequired);
           var semaphoreSlim = new SemaphoreSlim(workersCount);
           var channel = Channel.CreateBounded<Task<string>>(
           new BoundedChannelOptions(Constants.BufferSize){ SingleWriter = true });

           var readerTask = ReadFromSingleChannelAsync(channel, token);
           try
           {
               await foreach (var line in lines.WithCancellation(token))
               {   //Cancelling the  semaphore directly can be problematical
                   await semaphoreSlim.WaitAsync(CancellationToken.None);
                   var workerTask = Task.Run(() =>; WorkersFunction(line, semaphoreSlim));
                   await channel.Writer.WriteAsync(workerTask, token);
               }
           }
           catch (OperationCanceledException) { }
           channel.Writer.Complete();
           await readerTask;
       }

As in the previous example, the method uses a SemaphoreSlim to throttle the number of worker tasks. It creates an instance of Channel and sets the size of its data buffer. Then the following reader task is started but it is not awaited at this stage.

C#
private async Task ReadFromChannelAsync(Channel<Task<string>> channel, 
                                        CancellationToken token = default)
 {
     while (await channel.Reader.WaitToReadAsync(token))
     {
      var readTask = await channel.Reader.ReadAsync(token);
      var result = await readTask;
      RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(result));
     }
 }

The channel.Reader.WaitToReadAsync method returns true when data is available to be read and it returns false when the channel has been closed by the channel writer. As the method is asynchronous, it can be called from the UI thread and, as there is only one reader, it can update the UI thread without the need for synchronization. When the enumeration ends, Channel.Writer.Complete is called which, in turn, causes the readerTask to complete.

An Example Using DataFlow Blocks.

DataFlow blocks are classes found in the TPL DataFlow library, they are linked together to form data processing pipe lines. There are many different types but in this example, only two blocks are used, a TransformBlock that operates in a similar way to a Channel, as described previously, and an ActionBlock that simply writes the output from the TransformBlock to the UI. There is very little user code that needs to be written it’s just a case of setting the various options and ‘wiring’ the two blocks together. The method has been annotated as practically every line requires some explanation, which is probably why the DataFlow library is not as popular as it deserves to be.

C#
 public async Task UsingDataFlowAsync(int workersCount, CancellationToken token = default)
{
    var lines = GetLines(Constants.LinesRequired);
    //set the TransformerBlock options
    var options = new ExecutionDataflowBlockOptions
    {
     MaxDegreeOfParallelism = workersCount, //number of active worker threads
     SingleProducerConstrained = true,      //this saves having to gate the input
     BoundedCapacity = Constants.BufferCapacity,
     CancellationToken = token
    };
    //The Transform block takes a string as its input. It passes it to the WorkersFunction 
    //and outputs the value returned from that function.
    var transformBlock = new TransformBlock<string, string>(
                        (message) => WorkersFunction(message), options);

    //The ActionBlock takes the output string from the TransformBlock and 
    //raises the ItemsUpdateEvent on the UI thread, passing the output string to the 
    //EventArgs of that event
    var uiUpdaterBlock = new ActionBlock<string>(msg => RaiseItemsUpdatedEvent(
                         new ItemsUpdatedEventArgs(msg)),
                         new ExecutionDataflowBlockOptions { 
                         TaskScheduler = uiTaskScheduler, CancellationToken = token });

    //Setting the DataFlowLinkOption PropagateCompletion flag means that, 
    //if the TransformBlock receives  completion request that request will be passed    
    //on to the ActionBlock
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
    //Couple the TransformerBlock to the ActionBlock
    transformBlock.LinkTo(uiUpdaterBlock, linkOptions);

    await foreach (var line in lines.WithCancellation(token))
    {      //Send the line to the TransformerBlock and await for it to be accepted
           _ = await transformBlock.SendAsync(line);
    }
    //Complete the TransformBlock and await for the ActionBlock to complete
    transformBlock.Complete();
    await uiUpdaterBlock.Completion;
}

DataFlow pipelines have a structured and robust way of closing down or cancelling. All input is refused and the data buffers are then flushed in sequence along the pipeline. The TransformBlock partitions the data into groups of a size equal to the number of worker threads. When a partition has completed, it is outputted as a batch of data. This technique is efficient but it results in a delay before the first item appears in the UI.

Benchmark Results for the Three Methods

The following tests were run using the superlative BenchmarkDotNet NuGet package. It runs tests in isolation on a console application with no message loop. So the calls from the test methods to update the UI are, by necessity, illusionary. It’s necessary to capture the default synchronization context rather than the current context when running the tests. If the current context is captured, the test times will be infinite. The mean times shown are just for comparison purposes between the three tests. They are not indicative of the time taken for a single call to a method. They are only achieved by running approximately 10 warm up iterations followed by a further 100 timed iterations and by taking the average of the timed runs after removing any outliers.The tests were run with 10,000 lines of text, a page size of 3000 and a maximum of 35 active worker threads.

Method Mean StdDev Rank Gen 0 Gen 1 Allocated
DataFlow 4.510 s 0.0015 s 1 1000.0000   5MB
WorkerTasks 4.645 s 0.0326 s 2 2000.0000 1000.0000 8MB
Channel 4.666 s 0.0251 s 2 1000.0000   6MB

In the results table above, the figures in the Gen 0 and Gen 1 columns relate to garbage collector activity. They are the number of collections per 1000 calls to the method under test. The DataFow and Channel tests give rise to one lightweight Generation 0 collection. But the WorkerTasks test causes two Generation 0 collections to run and, in addition, it triggers a call to the more intensive Generation 1 collection. The method has an unordered output and the largest footprint. It seems to suffer from the need to maintain and rummage through a list of 10,000 tasks to check if any of them are still ‘hot’. The test that uses DataFlow blocks performs well and has a relatively small memory footprint. But it has the longest delay before the first data item is sent to the UI. The Channel method performs and scales well. It is probably the best of choice in terms of its efficiency and ability to quickly update the UI.

Conclusion

I hope that the observations and examples presented here are of assistance in understanding some of the many asynchronous data management patterns that have recently been added to the C# language and the .NET frameworks.

History

  • 10th December, 2021: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)