Last time we looked at reactive programming, and this time we will look at a very cool feature of F# which is called asynchronous workflows. It is fair to say that the new Async-Await syntax bears more than a passing resemblance to F# async workflows.
Async workflows offer a easy way to write asynchronous code in F# that perform some background task, and have a lot of helper functions to manage them using the Async class.
The Async Class
The Async class has a whole load of functions that allow you to write async code. Here is a table which shows the functions that you are free to use. We will be looking at some of these
And here are an example or 2 of how you might use some of these functions
Async.AwaitEvent
In this small example, we create a Timer, and then using the Async class’s Async.AwaitEvent(..)
, which will return a Async<T>
. This essentially created a thread pool thread to do the work, and then we carry on, and do some more stuff, and finally we use Async.RunSynchronously
to await the original Async<T>
created by the Async.AwaitEvent(..)
call.
Here is the code:
open System
open System.IO
open System
open System.Linq
open System.Collections.Generic
open Microsoft.FSharp.Control
[<EntryPoint>]
let main argv =
let waitForTimerEvent(timeout) =
let timer = new System.Timers.Timer(timeout)
let timerEvent = Async.AwaitEvent (timer.Elapsed) |> Async.Ignore
printfn "Time now: %O" DateTime.Now.TimeOfDay
timer.Start()
printfn "do other stuff"
Async.RunSynchronously timerEvent
printfn "Time Is Done at time: %O" DateTime.Now.TimeOfDay
Console.ReadLine() |> ignore
waitForTimerEvent 5000.0
0
The output may help to clarify this a bit:
Async.AwaitIAsyncResult
Here is another example (from MSDN) that shows how the Async class also offers function for creating Async<T>
values from the common APM (BeginSomeMethod()
/EndSomeMethod(IAsyncResult…)
). It can be seen that this example uses the Async class to both write and read to a text file asynchronously. It does introduce a bit of extra syntax by way of the “Let!” which I will be getting to in just a minute
open System
open System.IO
open System
open System.Linq
open System.Collections.Generic
open Microsoft.FSharp.Control
[<EntryPoint>]
let main argv =
let streamWriter1 = File.CreateText("test1.txt")
let count = 10000000
let buffer = Array.init count (fun index -> byte (index % 256))
printfn "Writing to file test1.txt."
let asyncResult = streamWriter1.BaseStream.BeginWrite(buffer, 0, count, null, null)
let readFile filename asyncResult count =
async {
let! returnValue = Async.AwaitIAsyncResult(asyncResult)
printfn "Reading from file test1.txt."
streamWriter1.Close()
let streamReader1 = File.OpenText(filename)
let! newBuffer = streamReader1.BaseStream.AsyncRead(count)
return newBuffer
}
let bufferResult = readFile "test1.txt" asyncResult count
|> Async.RunSynchronously
Console.ReadLine() |> ignore
0
Which when run will give the following results:
A Rather Nice UI Example
Credit where credit is due, I do a lot of UI work, and I have to say that the MSDN example on how to use some of the Async class functions in this area is first rate. This small example shows how to use the following Async class functions
Async.SwitchToThreadPool()
Async.SwitchToContext()
Async.StartImmediate(…)
Here is the relevant code:
open System.Windows.Forms
[<EntryPoint>]
let main argv =
let bufferData = Array.zeroCreate<byte> 100000000
let async1 (button : Button) =
async {
button.Text <- "Busy"
button.Enabled <- false
do! Async.Sleep 5000
let context = System.Threading.SynchronizationContext.Current
do! Async.SwitchToThreadPool()
use outputFile = System.IO.File.Create("longoutput.dat")
do! outputFile.AsyncWrite(bufferData)
do! Async.SwitchToContext(context)
button.Text <- "Start"
button.Enabled <- true
MessageBox.Show("Done") |> ignore
}
let form = new Form(Text = "Test Form")
let button = new Button(Text = "Start")
form.Controls.Add(button)
button.Click.Add(fun args -> Async.StartImmediate(async1 button))
Application.Run(form)
0
So what exactly is going on here? Well as any winforms/WPF/Silverlight or Window8 developer knows, you must update UI controls on the thread that created them, so this code deals with that, by way of a SychnorizationContext
which is the standard winforms way of posting delegates/actions to the correct thread. Lets walk through it
- Button is clicked on UI thread, which starts workflow using
Async.StartImmediate
which is in current thread (the UI thread) - Async workflow starts, and sets some form control values, which is ok as it is still the UI thread at this point
- We then store the windows forms
SynchronizationContext
to allow us to marshall calls back to the UI thread later - We then switch to a threadpool thread using
Async.SwitchToThreadPool()
- We do some async work where we write to a file
- We then switch back to the UI thread using the
SynchronizationContext
we stored earlier, and use the Async.SwitchToContext()
function - We then set some form control values, which is ok at this point as we are now back on the UI thread thanks to the switch to the previously stored
SynchronizationContext
Anyway here is the results of this code, After we click the button and trigger the async workflow
And when the async workflow has completed
This example did introduce a bit too much syntax, but we are just about to look into this so I hope you can forgive me that slight indulgence.
Starting Async Workflows Manually
As well as the inbuilt functions of the Async class, you can also manually create your own async workflows. Async workflows generally follow this syntax when created manually.
async { expression }
Using this syntax, the expression is some computation that will run asynchronously. This means it will not block the current computation/thread. The type of the expression is Async<’a>. There are many different ways of creating asynchronous code in F#, and you may use any of the types/functions which are available . The Async class is the best place to start.
F# async workflows allow synchronous and asynchronous operations to be used within the same workflow. They also come with their own set of syntax that can be used in the construction of workflows. Using thee following keywords (be careful though they are quite similar to the non async versions that you have already seen), you are able to create complete workflows, which may/may not contain a mixture of async code and synchronous code.
let!
Allows you to effectively wait for async results. It has the effect that the right hand side of the Let! must return before the rest of the async workflow continues
use!
The object is disposed of at the close of the current scope.
do!
The same as its counter part “do”, but is intended to be used inside async workflows
Here is a small manually created async workflow, that does nothing more than sleep inside of it
open System
[<EntryPoint>]
let main argv =
let sleepAsync(timeout) =
let sleeper = async {
printfn "Before sleep %O" (DateTime.Now.ToLongTimeString())
do! Async.Sleep timeout
printfn "After sleep %O" (DateTime.Now.ToLongTimeString())
}
Async.RunSynchronously sleeper
printfn "Async worflow completed"
sleepAsync(5000)a
Console.ReadLine() |> ignore
0
Which produces the following results when run
You may also nest workflows, which means that you may manually create or trigger async workflows from within workflows (kind of like inception, if you have seen it). Anyway here is an example of that, where we create a child workflow using our previous timer example, and then have a parent workflow use this child workflow.
open System
[<EntryPoint>]
let main argv =
let timerWorkflow = async {
let timer = new System.Timers.Timer(5000.0)
let timerEvent = Async.AwaitEvent (timer.Elapsed) |> Async.Ignore
timer.Start()
Async.RunSynchronously timerEvent
}
let parentWorkflow = async{
printfn "Starting parent at : %O" (DateTime.Now.ToLongTimeString())
let! childTimerWorkflow = Async.StartChild timerWorkflow
printfn "parentWorkflow is about to wait for a bit at : %O" (DateTime.Now.ToLongTimeString())
do! Async.Sleep 2000
printfn "parentWorkflow continues to do stuff at : %O" (DateTime.Now.ToLongTimeString())
let! result = childTimerWorkflow
printfn "parentWorkflow completed at : %O" (DateTime.Now.ToLongTimeString())
}
Async.RunSynchronously parentWorkflow
Console.ReadLine() |> ignore
0
Which when run gives the following result
Cancellation Of Workflows
You may also use the standard TPL type of cancellation to cancel an async workfow. That is one where you use a CancellationTokenSource
to provide a CancellationToken
to the async workflow, which you may then use to cancel the work flow with.
Here is some code that demonstrates this technique.
open System
open System.Threading
[<EntryPoint>]
let main argv =
let timerWorkflow = async {
for i in 0..9999 do
printfn "Start is %d" i
do! Async.Sleep(1000)
printfn "Done is %d" i
}
let cts = new CancellationTokenSource()
Async.Start (timerWorkflow,cts.Token)
Thread.Sleep(2000)
printfn "Cancelling" |> ignore
cts.Cancel()
printfn "Cancelled" |> ignore
Console.ReadLine() |> ignore
0
Which when run gives the follow results
The eagle eyed amongst you will notice that I did not have to do anything specific with the CancellationToken
inside the workflow itself, in F# this is all just handled for you. If you compare that to the C# equivalent (Ok we could use Parrallel.For(..) but for this demo I did not) you will see there is a lot more C# code
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication5
{
class Program
{
static void Main(string[] args)
{
var tcs = new CancellationTokenSource();
var token = tcs.Token;
var task = Task.Factory.StartNew(() =>
{
token.ThrowIfCancellationRequested();
for (int i = 0; i < 9999; i++)
{
Console.WriteLine(string.Format("Start is : {0}", i));
Thread.Sleep(1000);
Console.WriteLine(string.Format("Done is : {0}", i));
if (token.IsCancellationRequested)
token.ThrowIfCancellationRequested();
}
}, token);
tcs.Cancel();
try
{
task.Wait();
}
catch (AggregateException aex)
{
foreach (var ex in aex.InnerExceptions)
{
Console.WriteLine(aex.Message + " " + ex.Message);
}
}
Console.ReadKey();
}
}
}
Waiting For Cancelled Async WorkFlow
One thing that struck me as very odd is that if I was using C# I would still be able to know about a cancelled Task in TPL land, by wait of a continuation that only runs on Cancelled State, or I could use a Wait with a Try-Catch which caught AggregateException
(much like the sample shown above), and using async/await it is even easier.
Anyway my point being in C#, I would have no problems waiting on a cancellable task, there are many ways, but I could not seem to find a way to do that in F#. So I did some googling and low and behold Tomas Petricek has done some code that allows you to do that which you can find at this url: http://www.fssnip.net/d4, I got to this link from this Stack Overflow discussion : http://stackoverflow.com/questions/11609089/waiting-on-the-cancellation-of-an-asynchronous-workflow
His code is complex and probably not best suited to a beginners guide, but it is worth looking at if that is what you are after.
Serial Workflows
You can of course run async workflows in series, where you just wait for one workflow to complete using “let!”, here is a small example:
open System
open System.Threading
[<EntryPoint>]
let main argv =
let looper = async {
for i in 0..3 do
printfn "Start is %d" i
do! Async.Sleep(500)
printfn "Done is %d" i
}
let run2WorkflowsInSeries = async {
let! loop1 = looper
printfn "Done loop1"
let! loop2 = looper
printfn "Done loop2"
}
Async.RunSynchronously run2WorkflowsInSeries
printfn "All done"
Console.ReadLine() |> ignore
0
Which when runs gives the following output:
Parallel Workflows
It is also entirely possible to run async workflows using a parallel type arrangement. If I was using TPL in C#, this would be the equivalent of Task.WhenAll(..)
/ Task.WaitAll(..)
. I have shameless stolen the following code snippet from Tomas Petriceks blog, which you can find right here, I urge you to read this read this blog, it is very very interesting : http://tomasp.net/blog/csharp-fsharp-async-intro.aspx/
Here is the code, which demonstrates how to run async workflows in parallel
open System
open System.Threading
open System.Text.RegularExpressions
open System.Net
open System.IO
[<EntryPoint>]
let main argv =
let regTitle = new Regex(@"\<title\>([^\<]+)\</title\>")
let downloadPage(url:string) = async {
let request = HttpWebRequest.Create(url)
use! response = request.AsyncGetResponse()
use stream = response.GetResponseStream()
let temp = new MemoryStream()
let buffer = Array.zeroCreate 4096
let rec download() = async {
let! count = stream.AsyncRead(buffer, 0, buffer.Length)
do! temp.AsyncWrite(buffer, 0, count)
if count > 0 then return! download() }
do! download()
temp.Seek(0L, SeekOrigin.Begin) |> ignore
let html = (new StreamReader(temp)).ReadToEnd()
return regTitle.Match(html).Groups.[1].Value, html.Length }
let comparePages = async {
let! results =
[| "http://www.google.com";
"http://www.bing.com";
"http://www.yahoo.com" |]
|> Array.map downloadPage
|> Async.Parallel
for title, length in results do
Console.WriteLine("{0} (length {1})", title, length)
}
do comparePages |> Async.RunSynchronously
Console.ReadLine() |> ignore
0
Which when run produces the following results:
Further Reading : The F# Asynchronous Programming Model
Read more from the F# language creator, and main researcher, Don Syme and Tomas Petricek, I urge you all to go off and read the PDF at this link : http://research.microsoft.com/apps/pubs/default.aspx?id=147194