Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Using Reactive Extensions - the basics

0.00/5 (No votes)
29 Feb 2016 1  
How to connect hot observables to Rx

Introduction

When you start programming, one of the first things you start to come across are events, the stuff that is actually needed for humans to interact with your program. In this article, I will go over the most common ways to interact with a user interface with Reactive Extensions (Rx). This article will focus on so-called hot observables, i.a. listening to events that are already running on the UI thread using Rx. I should just quickly specify what a hot and cold observables really are; hot ones will sendoff values even if no-one is listening to them yet. Cold observables are only active once you subscribe to them, like connecting to a database. You could watch this video to a more in-depth explanation.

Sadly Rx documentation and tutorials are a bit shady, it is not easy to pick out the best solutions or best practices by just searching as there have, in many cases, been numerous updates of the rx-main framework since some of the blogs were written. There are quite a number of ways of doing one thing, so finding the solution that is right in a given instance is quite the challenge. That is one of the reasons that I decided to write this article, it is just what I have started to learn using Rx, and it caused me some frustration moments that I wish I didn't have. This is like the tutorial I would have liked to read.

If you want to create an application from scratch using Rx you will have download the Rx library from Microsoft, it's called Rx-main in NuGet. I would also advise you to get the package Rx-xaml, also released by Microsoft, as it contains a number of useful functions that you'll need later although I haven't used any code from it in this article. 

Button click event

I will use Rx to filter inputs from the Mouse and Keyboard to show you how you can hook them up to an observable stream. I Will start by placing just 2 controls in a standard WPF window:

  • Button
  • TextBox

Except for Windows Load, I will not hook any events directly in XAML, I will do everything in code behind. I start the tutorial by hooking up the most common event in an application, the button click event:

var buttonClick = Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
                             h => btnMainWindow.Click += h,
                             h => btnMainWindow.Click -= h);

There is actually several things going on in the code above. The first thing you might notice is that it actually have to methods, one that adds the handler to the Click event, and another that removes the handler for the click event. This is one of the key things with Rx, that the stream will actually know how to unsubscribe from the event itself, which means that you can unhook the event without knowing the eventhandler. I'll get back to this later.

As you also probably can tell it is creating a stream of Observable events by using a function called FromEventPattern, and that it has a sender and EventArgs as the standard click event. I was actually a bit lazy writing the code using the var keyword, what is actually returned to the buttonClick is this:

            IObservable<EventPattern<RoutedEventArgs>> buttonClick = 
                                       Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
                                       h => btnMainWindow.Click += h,
                                       h => btnMainWindow.Click -= h);

The EventPattern is just a holder that allows you to get the Sender and EventArgs from it like so:

var Sender = buttonClick.Select(evt => evt.Sender);
var EventArgs = buttonClick.Select(evt => evt.EventArgs);

Actually, the FromEventPattern is very versatile, with the proper modifications it could allow you to listen to property changed or hook a property descriptor to a property.

I can now listen to the event with a function call to the Subscribe function inside the buttonClick:

IDisposable DisposableButton = buttonClick
      .Subscribe(evt =>
      {
          var sender = evt.Sender;
          var eventArgs = evt.EventArgs;

          txtMainWindow.Text = "";
      }
  );

There is one thing that is very important in the above code, and that is that the subscribe function actually returns an IDisposable. This actually allows you to unsubscribe, i.a. remove the handler, just by calling the Dispose method:

DisposableButton.Dispose();

However, in the normal case you might have several subscriptions that you would like to terminate on Windows close or some other event. For these situations there exists something called CompositeDisposable, that I have declared in the Window:

public partial class MainWindow : Window
{

    CompositeDisposable DisposableSubscriptions = new CompositeDisposable();

    public MainWindow()
    {
        InitializeComponent();

   ...

I can now add the subscription to the CompositeDisposable instead:

DisposableSubscriptions.Add(buttonClick
      .Subscribe(evt =>
      {
          txtMainWindow.Text = "";
      }
  ));

This allows me to close all subscriptions by a single call, and that's pretty neat:

private void Window_Closing(object sender, System.ComponentModel.CancelEventArgs e)
{
    DisposableSubscriptions.Dispose();
}

At a first glance, it might not seem like much is gained by using Rx, you basically just get the same subroutine that allows you to do stuff when the button is clicked, and it has the same abilities that the normal button click void has allowing it to get the sender and event arguments to do some stuff with them. The real power of Rx first comes to fruition when you want to do something async, a weak subscription or some queries based on the user input.

There is also the thing that the events are a stream now, and they are what's called first class events. This is simply a fancy word that tells you that you can pass the entire stream off to different threads, and subscribe and unsubscribe to them there. 

Reacting to multiple Keyboard inputs

Most of the applications are designed to take in keyboard inputs in order to do some advanced edition. Normal features of TextBoxes for instance where you press Ctrl + C to copy, Ctrl + X to cut and so forth. My approach is basically a small refinement and generalisation from the stuff in this Stackoverflow question.

We start off by getting the PreviewKeyDown and PreviewKeyUp events, the reason for using the Preview events is to basically always get the Keystrokes that happens in the applications, as they can't be stopped by event bubbling or tunneling.

var KeyDown = Observable.FromEventPattern<KeyEventArgs>(this, "PreviewKeyDown");
var KeyUp = Observable.FromEventPattern<KeyEventArgs>(this, "PreviewKeyUp");

The first key that I want to grab is the left control key is pressed, this is pretty straightforward, except for one little detail. By default, if you hold down a key, windows will send multiple keystrokes as long as it is pressed, but this can easily be prevented by checking that the IsRepeat is false:

var KeysAfterCtrlHold = from FirstKeyPress in KeyDown
                                              .Where(e=> e.EventArgs.IsRepeat == false)
                                              .Where(e=> e.EventArgs.Key == Key.LeftCtrl)

This first bit of code is very standard Rx stuff, the real beauty of the code happens when the user hits more keystrokes, while the left control key is pressed:

var KeysAfterCtrlHold = from FirstKeyPress in KeyDown
                                              .Where(e=> e.EventArgs.IsRepeat == false)
                                              .Where(e=> e.EventArgs.Key == Key.LeftCtrl)
                        from RestOfKeyDown in KeyDown
                                              .TakeUntil(KeyUp.Where(e => e.EventArgs.Key == Key.LeftCtrl))
                                              .Where(e => e.EventArgs.Key != Key.LeftCtrl)
                                              .Select(evt=>evt.EventArgs.Key)
                        select RestOfKeyDown;

inside the RestOfKeyDown I want all of the keystrokes as long as the left control key is down, but I don't want the event from the left control key. As an afterthought, I only want the actual key down event, and I want to send just the Key as the argument to the subscriber done by using .Select(evt=>evt.EventArgs.Key). I couldn't do this before the filtering as I needed the EventArgs to check for the IsRepeat property. The last select statement is basically telling me that I only want to send the keys after the left control is down, and not the control down event.

So far it looks as I have toiled long and hard and not gotten very much out of it in return. The "key" thing to notice here, however, is that the code is completely general. It will send all the keys while the left control is pressed, so I can now react to multiple different key input sequences by just a little filtering. The obvious choice is to use either a Buffer or a Window to get the sequence you are interested in.

If I write a simple subscription to the event, and I want to react to the combination Ctrl + K + C strokes, like Visual Studio does as a hotkey to commenting out selected code, I will choose to call a buffer by the size of 2. (The Buffer is a moving window, where you can specify the number it should store, and the number of steps required to update the buffer, as a call Buffer(3,1) will store 3 consecutive values, and update them when 1 new value comes in). I can now write some simple code to get the keystrokes. You should make a note of the return values from the Buffer code is aIList<T> from the Observable<T> stream, and I will make use of this fact in just a moment:

IDisposable DisposableCtrlBuffer = KeysAfterCtrlHold.Buffer(2,1).Subscribe(evt =>
{
    var key1 = evt[0];
    var key2 = evt[1];

    txtMainWindow.Text += Environment.NewLine + "Key " + key1.ToString() + " and key " + key2.ToString() + " is pressed.";

    if (key1 == Key.K && key2 == Key.C)
    {
        txtMainWindow.Focus();
        txtMainWindow.SelectAll();
        this.Focus();
    }

    if (key1 == Key.K && key2 == Key.U)
    {
        txtMainWindow.Focus();
        txtMainWindow.Select(0, 0);
        this.Focus();
    }
});

This is all nice and well, but It has actually several problems with it. First off, it will store the Buffer in memory, even after the Ctrl key is released. It will also just combine the latest two keys pressed, and this could be problematic if I choose to listen to a 3 keystroke input as well in a different subscription. So I actually want to only get the first 2 keystrokes after the Ctrl key is pressed, and I want to reset the Buffer once the Ctrl key is released.

This is actually not so hard as you might think, I will just have to rewrite the buffer code in a slightly different manner. Instead of specifying how many numbers of keys the buffer should contain and/or the number of skipped values is could take to update it, you could instead use some of the overloads described in the documentation

On that is interesting in many cases is the overload that is on the form Buffer(Start_A_New_Buffer, Close_The_Buffer), and could be used like this:

.Buffer(
    // Start a new buffer when left Ctrl is pressed
    KeyDown.Where(e => e.EventArgs.Key == Key.LeftCtrl)
           .Where(e=> e.EventArgs.IsRepeat == false),
    // Close the buffer if left Ctrl is released
     _ => KeyUp.Where(e => e.EventArgs.Key == Key.LeftCtrl));

A little warning to you, it will include the start LeftCtrl in the buffer sent. This isn't bad, but it will not send the values until the Close_The_Buffer happen, in this case when the LeftCrl is released.

Another overload is to specify only when a Buffer should close, be aware that it will send the current buffer as soon as the close condition appear.

Buffer( () => Buffer_close_condition)

This is the one I'd like to use in the application, but I have a problem. I want the buffer to close once it has 2 values but I also want it to be flushed if you release the LeftCtrl key. I can achieve this by using the Observable.Merge function: 

var FlushableKeyBuffer = KeysAfterCtrlHold
   .Buffer(() => Observable.Merge(KeysAfterCtrlHold.Buffer(2), CloseBufferOnCtrlUp.Buffer(1)))
   .Where(evt=>evt.Count==2);

It basically just combining two separate streams into one, so in the code above it will propagate the keystrokes after the Ctrl is pressed, and switch source if the Ctrl key up event occurs. The Buffer(1,1) call is just to have the same return from both functions inside the Merge function. But I am not interested in propagating the Ctrl key up, so I want to stop it from being sent to the subscriber.

This is actually pretty interesting, as I was talking about how keystrokes are considered hot events, I have now created a filter that I could describe as changing from hot to cold as the code will only stream events when the Ctrl key is pressed and not before. 

The updated subscription code now looks like this:

IDisposable DisposableCtrlBuffer = FlushableKeyBuffer.Subscribe(evt =>
{
    var key1 = evt[0];
    var key2 = evt[1];

    txtMainWindow.Text += Environment.NewLine + "Key " + key1.ToString() + " and key " + key2.ToString() + " is pressed.";

    if (key1 == Key.K && key2 == Key.C)
    {
        txtMainWindow.Focus();
        txtMainWindow.SelectAll();
        this.Focus();
    }

    if (key1 == Key.K && key2 == Key.U)
    {
        txtMainWindow.Focus();
        txtMainWindow.Select(0, 0);
        this.Focus();
    }
});

This is very nice and all, but there is a much cleaner way of implementing KeysAfterCtrlHold, using Observable.Join. It also shows how the Join operator actually work, so without further ado:

var KeysAfterCtrlHold2 = Observable.Join(
    // Start the Left Control is down window
    KeyDown.Where(evt => evt.EventArgs.Key == Key.LeftCtrl && evt.EventArgs.IsRepeat == false),
    // Start listnening for key down events that isnt left control
    KeyDown.Where(evt => evt.EventArgs.Key != Key.LeftCtrl && evt.EventArgs.IsRepeat == false),
    // Close the Left Control window
    s => KeyUp.Where(evt => evt.EventArgs.Key == Key.LeftCtrl && evt.EventArgs.IsRepeat == false),
    // Make the key down event a point event
    c => Observable.Empty<Key>(),
    // The return key (s is always Left Control)
    (s, c) => {return c.EventArgs.Key;}
    );

The functionality is the exact same, but the readability is much better. I can now explain what Join actually does now, as it has two windows that it checks the concurrency of. This is just a fancy word that says that the Left Ctrl button is pressed down at the same time as some other key is pressed. The first two method calls tells you when the two respective windows should open, the third and forth tells you when they should be closed. I have chosen to close the other key down as soon as possible, as I only need this to function as a point event. As you would already notice, if one of the windows is closed, the Join won't return any value. 

There is another problem with the code, that it will send all the values that are held back from the KeysAfterCtrlHold. The reason is that the first Join will send all keystrokes while the Ctrl key is down. The buffer will start to send the values if it either hits 2 values or you release the Ctrl button. The buffer will start to produce a new buffer as soon as one of the closing arguments happen, but this time, it won't re-release when it hits 2 new values, instead it now waits on the second event to occur inside the Observable.Merge, the left Ctrl key up event. This means that if you hold down the left Ctrl and pressed the key A 10 times, it will first, as normal, send the 2 A's as expected. But if you now release the left Ctrl button, the Buffer will now release all the records it had stored, so you'll get 8 A's. The solution is to stop sending the FlushableBuffer when the Ctrl key is not pressed, I just created another Observable.Join, that would pass the buffer if, and only if, the Ctrl key was pressed:

var BufferedKeysAfterCtrlHold = Observable.Join
                                (
                                // Start the Left Control is down window
                                KeyDown
                                .Where(evt => evt.EventArgs.Key == Key.LeftCtrl)
                                .Where(evt=>  evt.EventArgs.IsRepeat == false),
                                // Start listnening for key down events that isnt left control
                                FlushableKeyBuffer,
                                // Close the Left Control window
                                s => KeyUp.Where(evt => evt.EventArgs.Key == Key.LeftCtrl),
                                // Make the key down event a point event
                                c => Observable.Empty<Key>(),
                                // The return key (s is always Left Control)
                                (s, c) => { return c; }
                                );

For reusability, I combined all the steps into a function call:

private IObservable<System.Collections.Generic.IList<Key>> Keystrokes(IObservable<EventPattern<KeyEventArgs>> KeyDown, IObservable<EventPattern<KeyEventArgs>> KeyUp, Key WhileThisKeyIsDown, int NumberOfKeysPushed)
{
    var GetKeysAfter = Observable.Join(
                                             // Start the Left Control is down window
                                             KeyDown.Where(evt => evt.EventArgs.Key == WhileThisKeyIsDown && evt.EventArgs.IsRepeat == false),
                                             // Start listnening for key down events that isnt left control
                                             KeyDown.Where(evt => evt.EventArgs.Key != WhileThisKeyIsDown && evt.EventArgs.IsRepeat == false),
                                             // Close the Left Control window
                                             s => KeyUp.Where(evt => evt.EventArgs.Key == WhileThisKeyIsDown),
                                             // Make the key down event a point event
                                             c => Observable.Empty<Key>(),
                                             // The return key (s is always Left Control)
                                             (s, c) => { return c.EventArgs.Key; }
                                             );

    var CloseTheBufferOnKeyUp = KeyUp.Where(e => e.EventArgs.Key == WhileThisKeyIsDown)
                                   .Select(x => x.EventArgs.Key);

    var FlushableKeyBuffer = GetKeysAfter
                             .Buffer(() => Observable.Merge(GetKeysAfter.Buffer(NumberOfKeysPushed, 1), CloseTheBufferOnKeyUp.Buffer(1, 1)))
                             .Where(evt => evt.Count == NumberOfKeysPushed);

    var FilteredBufferedKeys = Observable.Join(
                                             // Start the Left Control is down window
                                             KeyDown.Where(evt => evt.EventArgs.Key == WhileThisKeyIsDown && evt.EventArgs.IsRepeat == false),
                                             // Start listnening for key down events that isnt left control
                                             FlushableKeyBuffer,
                                             // Close the Left Control window
                                             s => KeyUp.Where(evt => evt.EventArgs.Key == WhileThisKeyIsDown),
                                             // Make the key down event a point event
                                             c => Observable.Empty<Key>(),
                                             // The return key (s is always Left Control)
                                             (s, c) => { return c; }
                                             );

    return FilteredBufferedKeys;
}

Combining Mouse inputs

This code is largely based on a challenge posted in a Channel 9 video on Observable.Join, and I have to say, that the Join operator was a hard one to get for me, and I didn't fully understood it until I saw the video called Programming Streams of Coincidence with Join and GroupJoin for Rx on channel 9. With the explanation as well as examples posted by other programmers, I finally got it. 

What I basically wanted to do was to combine MouseDown and MouseUp, and post the resulting duration of the click, so I started off by getting the event stream:

var MouseDown = Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseDown");
var MouseUp = Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseUp");

The full code is rather short, I just grab the MouseDown and MouseUp, filter it on the input where the left button is pushed. The Select is perhaps a bit more interesting as the_ => means take whatever the arguments passed from the mouse event and return the DateTime now:

var LeftMouseButtonPressedDuration = Observable.Join(
                 MouseDown
                  .Where(evt => evt.EventArgs.ChangedButton == MouseButton.Left)
                  .Select(_ => {return DateTime.Now;}),
                  MouseUp
                   .Where(evt => evt.EventArgs.ChangedButton == MouseButton.Left)
                   .Select(_ =>
                    {
                        DateTime now = DateTime.Now;
                        return now;
                    }
                    ),
                  CloseMouseDownWindow => MouseUp,
                  MouseUpEvent => Observable.Empty<DateTime>(),
                  (TimeDown, TimeUp) =>
                  {
                      TimeSpan MousePressedDuration = TimeUp - TimeDown;
                      return "Total duartion is: " + MousePressedDuration.ToString(@"ss\:fff");
                  }
                );

The interesting bits in the Join operator happens at the CloseMouseDownWindow event telling the Join when the window should end. You can actually set this to  TimeDown => Observable.Never<DateTime>(), this will now store all the MouseDown events that happen into an Observable<T> that will never close.  The TimeUp will tell when to close the window, and by setting it to => Observable.Empty it will stop the up window as soon as the event happen.


This was also a point that confused me, that the s and c will return singular value at the time, but can be of the stream type of Observable<T>. So when I get the result in (s, c) => ... they are of type <T>
 

All that was left was to post the result to the textbox sitting on the WPF Window:

DisposableSubscriptions.Add(LeftMouseButtonPressedDuration.Subscribe(evt =>
{
    txtMainWindow.Text += Environment.NewLine + evt.ToString();
})
);

 

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here