Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / F#

F#27 : Asynchronous Workflows

5.00/5 (1 vote)
22 May 2014CPOL7 min read 18.7K  
Last time we looked at reactive programming, and this time we will look at a very cool feature of F# which is called asynchronous workflows. It is fair to say that the new Async-Await syntax bears more than a passing resemblance to F# async workflows.

Last time we looked at reactive programming, and this time we will look at a very cool feature of F# which is called asynchronous workflows. It is fair to say that the new Async-Await syntax bears more than a passing resemblance to F# async workflows.

Async workflows offer a easy way to write asynchronous code in F# that perform some background task, and have a lot of helper functions to manage them using the Async class.

The Async Class

The Async class has a whole load of functions that allow you to write async code. Here is a table which shows the functions that you are free to use. We will be looking at some of these

image

And here are an example or 2 of how you might use some of these functions

Async.AwaitEvent

In this small example, we create a Timer, and then using the Async class’s Async.AwaitEvent(..), which will return a Async<T>. This essentially created a thread pool thread to do the work, and then we carry on, and do some more stuff, and finally we use Async.RunSynchronously to await the original Async<T> created by the Async.AwaitEvent(..) call.

Here is the code:

open System
open System.IO
open System
open System.Linq
open System.Collections.Generic
open Microsoft.FSharp.Control

[<EntryPoint>]
let main argv =  

let waitForTimerEvent(timeout) =

    // create a timer and associated async event
    let timer = new System.Timers.Timer(timeout)

    //async computation that waits of a particualt CLI Event to occur
    let timerEvent = Async.AwaitEvent (timer.Elapsed) |> Async.Ignore

    printfn "Time now: %O" DateTime.Now.TimeOfDay
    timer.Start()

    printfn "do other stuff"

    //blocks waiting for the timer tick, this is pretty
    //much the same as await someTask in async/await
    Async.RunSynchronously timerEvent

    printfn "Time Is Done at time: %O" DateTime.Now.TimeOfDay

    Console.ReadLine() |> ignore

waitForTimerEvent 5000.0

//return 0 for main method
0

The output may help to clarify this a bit:

image

Async.AwaitIAsyncResult

Here is another example (from MSDN) that shows how the Async class also offers function for creating Async<T> values from the common APM (BeginSomeMethod()/EndSomeMethod(IAsyncResult…)). It can be seen that this example uses the Async class to both write and read to a text file asynchronously. It does introduce a bit of extra syntax by way of the “Let!” which I will be getting to in just a minute

open System
open System.IO
open System
open System.Linq
open System.Collections.Generic
open Microsoft.FSharp.Control

[<EntryPoint>]
let main argv =

    let streamWriter1 = File.CreateText("test1.txt")
    let count = 10000000
    let buffer = Array.init count (fun index -> byte (index % 256))

    printfn "Writing to file test1.txt."
    let asyncResult = streamWriter1.BaseStream.BeginWrite(buffer, 0, count, null, null)

    // Read a file, but use AwaitIAsyncResult to wait for the write operation
    // to be completed before reading.
    let readFile filename asyncResult count =
    async {
        //write the file but do not continue until we have done so
        let! returnValue = Async.AwaitIAsyncResult(asyncResult)
        //wait for the file to be written before we read it, again this is like
        //await File.WriteAllTextAync(…) in async/await land
        printfn "Reading from file test1.txt."
        // Close the file.
        streamWriter1.Close()
        // Now open the same file for reading.
        let streamReader1 = File.OpenText(filename)
        let! newBuffer = streamReader1.BaseStream.AsyncRead(count)
        return newBuffer
    }

    let bufferResult = readFile "test1.txt" asyncResult count
        |> Async.RunSynchronously

Console.ReadLine() |> ignore

//return 0 for main method
0

Which when run will give the following results:

image

A Rather Nice UI Example

Credit where credit is due, I do a lot of UI work, and I have to say that the MSDN example on how to use some of the Async class functions in this area is first rate. This small example shows how to use the following Async class functions

  • Async.SwitchToThreadPool()
  • Async.SwitchToContext()
  • Async.StartImmediate(…)

Here is the relevant code:

open System.Windows.Forms

[<EntryPoint>]
let main argv =

    let bufferData = Array.zeroCreate<byte> 100000000

    let async1 (button : Button) =
    async {
        button.Text <- "Busy"
        button.Enabled <- false
        do! Async.Sleep 5000
        let context = System.Threading.SynchronizationContext.Current
        do! Async.SwitchToThreadPool()
        use outputFile = System.IO.File.Create("longoutput.dat")
        do! outputFile.AsyncWrite(bufferData)
        do! Async.SwitchToContext(context)
        button.Text <- "Start"
        button.Enabled <- true
        MessageBox.Show("Done") |> ignore
    }

    let form = new Form(Text = "Test Form")
    let button = new Button(Text = "Start")
    form.Controls.Add(button)
    button.Click.Add(fun args -> Async.StartImmediate(async1 button))
    Application.Run(form)
    //return 0 for main method
    0 

So what exactly is going on here? Well as any winforms/WPF/Silverlight or Window8 developer knows, you must update UI controls on the thread that created them, so this code deals with that, by way of a SychnorizationContext which is the standard winforms way of posting delegates/actions to the correct thread. Lets walk through it

  1. Button is clicked on UI thread, which starts workflow using Async.StartImmediate which is in current thread (the UI thread)
  2. Async workflow starts, and sets some form control values, which is ok as it is still the UI thread at this point
  3. We then store the windows forms SynchronizationContext to allow us to marshall calls back to the UI thread later
  4. We then switch to a threadpool thread using Async.SwitchToThreadPool()
  5. We do some async work where we write to a file
  6. We then switch back to the UI thread using the SynchronizationContext we stored earlier, and use the Async.SwitchToContext() function
  7. We then set some form control values, which is ok at this point as we are now back on the UI thread thanks to the switch to the previously stored SynchronizationContext

Anyway here is the results of this code, After we click the button and trigger the async workflow

image

And when the async workflow has completed

image

This example did introduce a bit too much syntax, but we are just about to look into this so I hope you can forgive me that slight indulgence.

Starting Async Workflows Manually

As well as the inbuilt functions of the Async class, you can also manually create your own async workflows. Async workflows generally follow this syntax when created manually.

async { expression }

Using this syntax, the expression is some computation that will run asynchronously. This means it will not block the current computation/thread. The type of the expression is Async<’a>. There are many different ways of creating asynchronous code in F#, and you may use any of the types/functions which are available . The Async class is the best place to start.

F# async workflows allow synchronous and asynchronous operations to be used within the same workflow. They also come with their own set of syntax that can be used in the construction of workflows. Using thee following keywords (be careful though they are quite similar to the non async versions that you have already seen), you are able to create complete workflows, which may/may not contain a mixture of async code and synchronous code.

let!

Allows you to effectively wait for async results. It has the effect that the right hand side of the Let! must return before the rest of the async workflow continues

use!

The object is disposed of at the close of the current scope.

do!

The same as its counter part “do”, but is intended to be used inside async workflows

Here is a small manually created async workflow, that does nothing more than sleep inside of it

open System

[<EntryPoint>]
let main argv =

    let sleepAsync(timeout) =
    //this is the manual async workflow
    let sleeper = async {
        printfn "Before sleep %O" (DateTime.Now.ToLongTimeString())
        do! Async.Sleep timeout
        printfn "After sleep %O" (DateTime.Now.ToLongTimeString())
    }

    //wait on the sleeper (where sleeper is Async<T>)
    Async.RunSynchronously sleeper

    printfn "Async worflow completed"

    //call the function that contains the async workflow
    sleepAsync(5000)a

    Console.ReadLine() |> ignore

    //return 0 for main method
    0 

Which produces the following results when run

image

You may also nest workflows, which means that you may manually create or trigger async workflows from within workflows (kind of like inception, if you have seen it). Anyway here is an example of that, where we create a child workflow using our previous timer example, and then have a parent workflow use this child workflow.

open System

[<EntryPoint>]
let main argv =

    //Child workflow
    let timerWorkflow = async {
        let timer = new System.Timers.Timer(5000.0)
        let timerEvent = Async.AwaitEvent (timer.Elapsed) |> Async.Ignore
        timer.Start()

        //wait for the timer Async<T>
        Async.RunSynchronously timerEvent
    }

    //Parent workflow
    let parentWorkflow = async{

        printfn "Starting parent at : %O" (DateTime.Now.ToLongTimeString())
        let! childTimerWorkflow = Async.StartChild timerWorkflow

        printfn "parentWorkflow is about to wait for a bit at : %O" (DateTime.Now.ToLongTimeString())
        do! Async.Sleep 2000
        printfn "parentWorkflow continues to do stuff at : %O" (DateTime.Now.ToLongTimeString())

        // use let! to wait for the childTimerWorkflow
        let! result = childTimerWorkflow

        printfn "parentWorkflow completed at : %O" (DateTime.Now.ToLongTimeString())
    }

    // run the parentWorkflow
    Async.RunSynchronously parentWorkflow

    Console.ReadLine() |> ignore

    //return 0 for main method
    0 

Which when run gives the following result

image

Cancellation Of Workflows

You may also use the standard TPL type of cancellation to cancel an async workfow. That is one where you use a CancellationTokenSource to provide a CancellationToken to the async workflow, which you may then use to cancel the work flow with.

Here is some code that demonstrates this technique.

open System
open System.Threading

[<EntryPoint>]
let main argv =

    //Child workflow
    let timerWorkflow = async {

    for i in 0..9999 do
        printfn "Start is %d" i
        do! Async.Sleep(1000)
        printfn "Done is %d" i

    }


    let cts = new CancellationTokenSource()

    // start the timerWorkflow but pass in the CancellationToken
    Async.Start (timerWorkflow,cts.Token)

    //wait 1 second then cancel the token
    Thread.Sleep(2000)

    printfn "Cancelling" |> ignore
    cts.Cancel()

    printfn "Cancelled" |> ignore

    Console.ReadLine() |> ignore

    //return 0 for main method
    0 

Which when run gives the follow results

image

The eagle eyed amongst you will notice that I did not have to do anything specific with the CancellationToken inside the workflow itself, in F# this is all just handled for you. If you compare that to the C# equivalent (Ok we could use Parrallel.For(..) but for this demo I did not) you will see there is a lot more C# code

C#
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication5
{
    class Program
    {
        static void Main(string[] args)
        {

            var tcs = new CancellationTokenSource();
            var token = tcs.Token;

            var task = Task.Factory.StartNew(() =>
            {
                token.ThrowIfCancellationRequested();
                for (int i = 0; i < 9999; i++)
                {

                    Console.WriteLine(string.Format("Start is : {0}", i));
                    Thread.Sleep(1000);
                    Console.WriteLine(string.Format("Done is : {0}", i));
                    if (token.IsCancellationRequested)
                        token.ThrowIfCancellationRequested();

                }
            }, token);

            tcs.Cancel();

            try
            {
                task.Wait();
            }
            catch (AggregateException aex)
            {
                foreach (var ex in aex.InnerExceptions)
                {
                    Console.WriteLine(aex.Message + " " + ex.Message);
                }
            }

            Console.ReadKey();
        }
    }
}

Waiting For Cancelled Async WorkFlow

One thing that struck me as very odd is that if I was using C# I would still be able to know about a cancelled Task in TPL land, by wait of a continuation that only runs on Cancelled State, or I could use a Wait with a Try-Catch which caught AggregateException (much like the sample shown above), and using async/await it is even easier.

Anyway my point being in C#, I would have no problems waiting on a cancellable task, there are many ways, but I could not seem to find a way to do that in F#. So I did some googling and low and behold Tomas Petricek has done some code that allows you to do that which you can find at this url: http://www.fssnip.net/d4, I got to this link from this Stack Overflow discussion : http://stackoverflow.com/questions/11609089/waiting-on-the-cancellation-of-an-asynchronous-workflow

His code is complex and probably not best suited to a beginners guide, but it is worth looking at if that is what you are after.

Serial Workflows

You can of course run async workflows in series, where you just wait for one workflow to complete using “let!”, here is a small example:

open System
open System.Threading

[<EntryPoint>]
let main argv =

    let looper = async {
        for i in 0..3 do
            printfn "Start is %d" i
            do! Async.Sleep(500)
            printfn "Done is %d" i
    }

    //runs 2 async workflows one after another
    let run2WorkflowsInSeries = async {
        let! loop1 = looper
        printfn "Done loop1"
        let! loop2 = looper
        printfn "Done loop2"
    }

    Async.RunSynchronously run2WorkflowsInSeries
    printfn "All done"

    Console.ReadLine() |> ignore

    //return 0 for main method
    0 

Which when runs gives the following output:

image

Parallel Workflows

It is also entirely possible to run async workflows using a parallel type arrangement. If I was using TPL in C#, this would be the equivalent of Task.WhenAll(..) / Task.WaitAll(..). I have shameless stolen the following code snippet from Tomas Petriceks blog, which you can find right here, I urge you to read this read this blog, it is very very interesting : http://tomasp.net/blog/csharp-fsharp-async-intro.aspx/

Here is the code, which demonstrates how to run async workflows in parallel

open System
open System.Threading
open System.Text.RegularExpressions
open System.Net
open System.IO

[<EntryPoint>]
let main argv =

    let regTitle = new Regex(@"\<title\>([^\<]+)\</title\>")

    /// Asynchronously downloads a web page and returns
    /// title of the page and size in bytes
    let downloadPage(url:string) = async {
        let request = HttpWebRequest.Create(url)
        // Asynchronously get response and dispose it when we're done
        use! response = request.AsyncGetResponse()
        use stream = response.GetResponseStream()
        let temp = new MemoryStream()
        let buffer = Array.zeroCreate 4096

        // Loop that downloads page into a buffer (could use 'while'
        // but recursion is more typical for functional language)
        let rec download() = async {
            let! count = stream.AsyncRead(buffer, 0, buffer.Length)
            do! temp.AsyncWrite(buffer, 0, count)
            if count > 0 then return! download() }

        // Start the download asynchronously and handle results
        do! download()
        temp.Seek(0L, SeekOrigin.Begin) |> ignore
        let html = (new StreamReader(temp)).ReadToEnd()
        return regTitle.Match(html).Groups.[1].Value, html.Length }

    // Downloads pages in parallel and prints all results
    let comparePages = async {
        //wait for all results, similar to Task.WhenAll
        let! results =
            [| "http://www.google.com";
                "http://www.bing.com";
                "http://www.yahoo.com" |]
            |> Array.map downloadPage
            |> Async.Parallel
        for title, length in results do
            Console.WriteLine("{0} (length {1})", title, length)
    }

    // Start the computation on current thread
    do comparePages |> Async.RunSynchronously

    Console.ReadLine() |> ignore

    //return 0 for main method
    0 

Which when run produces the following results:

image

Further Reading : The F# Asynchronous Programming Model

Read more from the F# language creator, and main researcher, Don Syme and Tomas Petricek, I urge you all to go off and read the PDF at this link : http://research.microsoft.com/apps/pubs/default.aspx?id=147194

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)