Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / F#

F#26 : Reactive Programming

4.50/5 (2 votes)
22 May 2014CPOL6 min read 22.6K  
In this post we will look at using a reactive programming paradigm within F#. There may be some of you that have used the Reactive Extensions (Rx), I am in fact a massive fan boy of Rx, and really enjoy what it can bring to a project, in particular I think a UI project benefits […]

In this post we will look at using a reactive programming paradigm within F#. There may be some of you that have used the Reactive Extensions (Rx), I am in fact a massive fan boy of Rx, and really enjoy what it can bring to a project, in particular a UI project benefits immensely from a more reactive approach.

There may of course be those of you that have never come across Rx at all. So lets take a very small detour and talk about the general idea of the observer pattern.

Observer Pattern

Lets say I have order system that should print invoices and also send emails to a client when a new order is received. A naive implementation of this may be to just lump all this into a single class, but this is a poor separation of concerns, we could do better. So what we could do it have a order system object which receives orders and then calls into 2 other classes that produce a invoice and send an email to a client.

This is ok, but we would now have to take a strong dependency on the sub systems from the order system class, and then call them when a new order arrives. Wouldn’t it be better if the order system class could just accept a list of subscribers (say implementing some IAcceptOrder interface), and then any time a new order arrives the order system class would simply loop through its list of subscribers and call their AcceptOrder method (from the hypothetical IAcceptOrder interface).

This is essence is the observer pattern, of course this pattern could also allow unsubscribing too.

Rx has built upon the general idea the observer pattern and has provided a great many standard .NET classes that aid in the construction of observable streams of data. You can also use subscriptions over events, asynchronous operations, and also use the standard LINQ operators.

So that is what Rx gives you.

F# also comes with a Control.Observable /Control.Event modules which contains many types that can be used to create reactive code The types and functions in this module mirror some of the functionality found in the Rx classes (see http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable(v=vs.103).aspx).

This article is however about F#, so we will be keeping the discussion from here on out about the F# Observable modules.

Observable Module

The F# Observable module is the place to start for reactive programming in F#, and contains the following functions/types

image

We will be seeing some examples of these is just a minute, but before we do it is worth just going on another slight detour. So lets just get the slight detour over with before we start to look at the Observable module in depth.

<SlightDetour>

There were several things that made Rx so powerful :

  • The ability to treat events as a source to create an IObservable<T>. This IObservable<T> could then have any of the standard LINQ operators applied to it, along with another specific set of IObservable<T> extension methods. This is a very powerful technique, which allows you to do things like only listen to an event where some condition is true, project the original event args into a completely new type, merge 2 or more events into a single stream (this technique is particularly powerful when building UIs, I use this one a lot)
  • The decoupling of the event into a much for general purpose interface (IObservable<T>), which means that users of the original event source didn’t need a strong reference (they could be in different Dlls) to the original event source class. This is a much better / cleaner design, just rely on a type (IObservable<T>) which is a base class type
  • The ability to subscribe, which would return a IDisposable, which you could then just wrap in a using(..) or Dispose of when you were done with it, both of which would stop the subscriber from receiving any further notifications

The F# team (Don Syme I guess here), have exposed some (though not all) of the Rx goodness in F# so it is important to understand some of the reasons why IObservable<T> was so useful.

</SlightDetour>

OK now that we have talked about why Observable is better than a plain old event, lets have a look at some examples using the F# Observable module. Here is a small windows form (yes you can do forms in F# quite easily too) example that demonstrates the following things:

  • How to create an IObservable<’a> from a standard event using Observable.filter, where the filter is just being used to give back a IObservable<’a>, in essence no filtering at all
  • How to use Observable.add to add a handler to an IObservable<’a>
  • How to create an actual filter from an IObservable<’a> using Observable.filter, this filter ensures that only MouseMove events that happen in the bottom 1/2 of the form notify listeners
  • How to create an IDisposable subscription using Observable.subscribe
  • How to cancel a subscription by simply calling Dispose on the subscriber which is an IDisposable. In this example this is done using a Task.Delay(4000), which means the subscriber will only work for 4 seconds, and then will not receive any notifications after that
open System
open System.IO
open System
open System.Linq
open System.Collections.Generic
open ConsoleApplication1.CustomTypes
open System.IO
open System.Drawing
open System.Windows.Forms
open System.Threading.Tasks

[<EntryPoint>]
let main argv =

//create a form
let form = new Form(Text = "F# Windows Form",
Visible = true,
TopMost = true)

let label1 = new Label()
label1.Text <- "Label1"
label1.Location <- new Point(10,10)

let txt1 = new TextBox()
txt1.Width <- 160
txt1.Location <- new Point(120,10)

let label2 = new Label()
label2.Text <- "Label2"
label2.Location <- new Point(10,40)

let txt2 = new TextBox()
txt2.Width <- 160
txt2.Location <- new Point(120,40)

form.Controls.Add(label1)
form.Controls.Add(txt1)
form.Controls.Add(label2)
form.Controls.Add(txt2)

//use Control.Observable reactive functions
form.MouseMove
    |> Observable.filter ( fun evArgs -> true)
    |> Observable.add ( fun evArgs ->
        txt1.Text <- String.Format("x: {0} y :{1}", evArgs.X, evArgs.Y))

//shows how to subscribe, such that we get a IDisposable back for subscription
let sub =
    form.MouseMove
    |> Observable.filter ( fun evArgs -> evArgs.Y > form.Height / 2)
    |> Observable.subscribe ( fun evArgs ->
        txt2.Text <- String.Format("x: {0} y :{1}", evArgs.X, evArgs.Y))

//dispose of the subsriber after 4 seconds
Task.Delay(4000).ContinueWith(fun x -> sub.Dispose()) |> ignore

//run the windows form message pump
Application.Run(form)

//return 0 for main method
0

Which when runs looks like this:

image

With this one I urge you to try the code out for yourself, and you will not be able to see that the Label2 associated TextBox stops updating after 4 seconds in a screen shot

This is cool for sure, but the real power of Observable is that you can use it against your own events too, so lets wrap up this post by looking at an example where we create our own event source, and look at a few more of the Observable module functions.

Here is a small class that contains a single “NewOrderEvent”, which uses a custom EventArgs derived class called “OrderEventArgs”. The “OrderEventArgs” has the following 3 properties:

  1. Price : decimal
  2. AuthorisationLevel : Which uses a empty discriminating union type called “DiscountApprovalLevel”.

Here is the relevant code:

C#
namespace ConsoleApplication1
module CustomTypes =

open System
open System.Collections.Generic
open System.ComponentModel
open System.Reflection

type DiscountApprovalLevel = Standard | Manager | Ceo

type Order = { Price : decimal; AuthorisationLevel : DiscountApprovalLevel }

type OrderArgs(price : decimal, authorisationLevel : DiscountApprovalLevel) =
    inherit System.EventArgs()

    member this.Price = price
    member this.AuthorisationLevel = authorisationLevel

type OrderChangeDelegate = delegate of obj * OrderArgs -> unit

type OrderSystem() =
    let newOrderEvent = new Event<OrderChangeDelegate, OrderArgs>()

    member this.CreateOrder(order) =
        newOrderEvent.Trigger(this,new OrderArgs(order.Price, order.AuthorisationLevel))

[<CLIEvent>]
member this.NewOrderEvent = newOrderEvent.Publish

So that is the code for the source of the custom event, so how about the Observable code, lets see that next:

C#
open System
open System.IO
open System
open System.Linq
open System.Collections.Generic
open ConsoleApplication1.CustomTypes
open System.IO

[<EntryPoint>]
let main argv =

// Use the Observable module to only subscribe to specific events
let orderSystem = new OrderSystem()

let stdDiscountObservable, managerApprovalObservable =
    orderSystem.NewOrderEvent
    // Filter event to just Standard Or Manager level authorisation orders
    |> Observable.filter(fun orderArgs ->
        match orderArgs.AuthorisationLevel with
        | Standard | Manager -> true
        | _ -> false)
    // Split the event into 'Standard' and Other discount approval level
    |> Observable.partition(fun orderArgs ->
        orderArgs.AuthorisationLevel = DiscountApprovalLevel.Standard)

// Add event handlers to the stdDiscountObservable IObservable<OrderEventArgs> stream
stdDiscountObservable.Add(fun args -> printfn "Price : %A, Level : %A" args.Price args.AuthorisationLevel)

// Add event handlers to the stdDiscountObservable IObservable<OrderEventArgs> stream
managerApprovalObservable.Add(fun args ->printfn "Price : %A, Level : %A" args.Price args.AuthorisationLevel)

orderSystem.CreateOrder( { Price = 120.0m; AuthorisationLevel = Manager } )

//this will not fire any code, as we have filtered out to not include CEO EventArgs
orderSystem.CreateOrder( { Price = 240.0m; AuthorisationLevel = Ceo } )

orderSystem.CreateOrder( { Price = 10.0m; AuthorisationLevel = Standard } )
orderSystem.CreateOrder( { Price = 20.0m; AuthorisationLevel = Standard } )
orderSystem.CreateOrder( { Price = 50.0m; AuthorisationLevel = Standard } )

//this will not fire any code, as we have filtered out to not include CEO EventArgs
orderSystem.CreateOrder( { Price = 240.0m; AuthorisationLevel = Ceo } )

Console.ReadLine() |> ignore

//return 0 for main method
0

There are several things to point out in this code:

  1. We use a proper Observable.filter this time, such that only “Standard” and “Manager” AuthorisationLevel OrderEventArgs come through. This means any event that has OrderEventArgs with a AuthorisationLevel of “Ceo” are effectively ignored
  2. We use Observable.partition to partition the source IObservable<OrderEventArgs> into 2 separate IObservable<OrderEventArgs>

Here is what this code looks like when it is run:

image

Using More Of The Rx Extension Methods

As I previously stated, I am a massive Rx fan boy, so I was a little disappointed to see that the F# Observable module did not have the full set of extension methods that Rx would have for IObservable<T>. However it seems I am not alone here, and some people have put a Github project which brings the standard Rx extension methods to F#, here is a link : https://github.com/fsprojects/FSharp.Reactive

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)