Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Fun with Rx

0.00/5 (No votes)
9 Mar 2012 1  
A brief look into the DevLabs Reactive Framework.

Introduction

This article is an exploratory look into the Microsoft DevLabs project called Reactive Framework (Rx for short). I feel kind of weird saying this, but the demo solution that comes with this article will provide you nothing re-usable at all, but what it should hopefully do is give you an understanding of how Rx works.

Prerequisites

As the Reactive Framework is still effectively a DevLabs project, and not entirely in the mainstream .NET Framework (yet), you are still required to download and install an MSI, which you can find at the following URL: http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx, where the demo app relies on the "Rx for .NET 4.0" option being downloaded and installed.

The other thing you will need is Visual Studio 2010, and .NET 4.0

Rx

Rx is a library for composing asynchronous and event-based programs using observable collections.

The "A" in "AJAX" stands for asynchronous, and indeed modern Web-based and Cloud-based applications are fundamentally asynchronous. In fact, Silverlight bans all blocking networking and threading operations. Asynchronous programming is by no means restricted to Web and Cloud scenarios, however. Traditional desktop applications also have to maintain responsiveness in the face of long latency IO operations and other expensive background tasks.

Another common attribute of interactive applications, whether Web/Cloud or client-based, is that they are event-driven. The user interacts with the application via a GUI that receives event streams asynchronously from the mouse, keyboard, and other inputs.

Rx is a superset of the standard LINQ sequence operators that expose asynchronous and event-based computations as push-based, observable collections via the new .NET 4.0 interfaces IObservable<T> and IObserver<T>. These are the mathematical dual of the familiar IEnumerable<T> and IEnumerator<T> interfaces for pull-based, enumerable collections in the .NET Framework.

The IEnumerable<T> and IEnumerator<T> interfaces allow developers to create reusable abstractions to consume and transform values from a wide range of concrete enumerable collections such as arrays, lists, database tables, and XML documents. Similarly, Rx allows programmers to glue together complex event processing and asynchronous computations using LINQ queries over observable collections such as .NET events and APM-based computations, PFx concurrent Task<T>, the Windows 7 Sensor and Location APIs, SQL StreamInsight temporal event streams, F# first-class events, and async workflows.

- Reactive Framework home page up on date 07/09/10.

I will just add a few words here to clarify how I would explain RX to people that are familiar with .NET programming in general.

In most of the .NET programming you have done already, you have more than likely been used to hooking up events, and listening to these events to fire their subscribed delegates, and then run some code in an event handler delegate. You could think of this as a pull mechanism really. Which is fine, but with Rx, this is kind of turned upside down, and when something happens, it is pushed at a subscriber as soon as it occurs via the new IObservable<T> and IObserver<T> interfaces. Which granted by itself doesn't seem that revolutionary, but there are a few bits and pieces that I think really do make Rx shine. It is these things that I hope to show you during the course of discussing the demo apps shown below.

Discussing the Demo Apps

I have included four small demo projects within the overall VS2010 solution link at the top of this article. In the following sub sections, I will explain the demo apps that can be found in the article's download demo solution. I should point out, these demo apps are all fairly similar in nature, as I feel the most important aspect of working with Rx will be subscribing to IObservable<T>, so that is why the demo apps all have a similar angle. That is not to say this is all that Rx does; no way, it does loads more, this is just my opinion of what I thought would be useful to look into and share with the lot.

What I will do at the start of each demo section below, I will give you a brief discussion of what the demo app does, and then show you a screenshot and then explain the code within the demo projects.

RxDraw (.NET 4.0)

Demo Description

This demo illustrates how to use Subject<T> which is both IObserver<T>, IObservable<T>, where there is a RxPanel UserControl that contains a Subject<T> which is subscribed to in MainWindow.xaml.cs. The RxPanel UserControl listens to mouse movements, and when it deems it has a full polyline, it will alert a subscriber of that. It also illustrates that when you subscribe to something, you get a IDisposable which is pretty cool, can use it in using statements, and it should be easier to manage than an event handler delegate which you must keep track of.

It can be seen that there are two panels which are hosted in a single WPF Window. The Window is called MainWindow.xaml, and the top panel is actually a UserControl called RxPanel.

The RxPanel UserControl uses an Rx object called Subject<T> which is a very handy class which is both IObserver<T>, IObservable<T>, which makes it very convenient to work with. The basic idea is that whenever we get a full PolyLine, we will use the Subject<T> contained within the RxPanel UserControl to signal to its subscribers that something happened (a reaction I suppose).

Here is the full code for RxPanel; the main things to note are the two Rx fields and what happens in the MouseUp event handler. That is the Subject<T> being told to tell its subscribers something happened.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;

namespace RxDraw
{
    /// <summary>
    /// Interaction logic for RxPanel.xaml
    /// </summary>
    public partial class RxPanel : UserControl
    {

        private Point previousPoint = new Point();
        private Polyline currentLine = new Polyline();
        
        //Reactive fields
        private readonly Subject<Polyline> mouseMovesInPanel = 
                         new Subject<Polyline>();
        public IObservable<Polyline> MouseMoves { 
                   get { return mouseMovesInPanel.AsObservable(); } }


        public RxPanel()
        {
            InitializeComponent();
        }


        private void Canvas_MouseDown(object sender, MouseButtonEventArgs e)
        {
            if (e.LeftButton == MouseButtonState.Pressed)
            {
                currentLine = new Polyline
                {
                    Stroke = Brushes.Magenta,
                    StrokeThickness = 4
                };
                canv.Children.Add(currentLine);
                currentLine.Points.Add(e.GetPosition(canv));
            }
        }

        private void Canvas_MouseMove(object sender, MouseEventArgs e)
        {
            if (e.LeftButton == MouseButtonState.Pressed)
                currentLine.Points.Add(e.GetPosition(canv));
        }

        private void Canvas_MouseUp(object sender, MouseButtonEventArgs e)
        {
            mouseMovesInPanel.OnNext(currentLine);
        }
    }
}

So how does the actual subscription work then? Well, that is pretty easy. Shown below are the most relevant parts from the MainWindow.xaml.cs code that shows how the subscribing works.

private void MainWindow_Loaded(object sender, RoutedEventArgs e)
{
    rxPanelSubcription = rxPanel.MouseMoves.Subscribe(SubscriptionMethod);
}

private void SubscriptionMethod(Polyline rxPanelPolyline)
{
    canvAutoDrawn.Children.Add(CreateLineFromOther(rxPanelPolyline));
}

private Polyline CreateLineFromOther(Polyline existingLine)
{
    Polyline polyline = new Polyline();
    polyline.StrokeThickness = 2;
    polyline.Stroke = Brushes.Black;
    foreach (Point point in existingLine.Points)
    {
        polyline.Points.Add(new Point(point.X, point.Y));
    }
    return polyline;
}

So far it all looks like pretty standard event stuff, right? Well, one of the things that can be problematic with standard events in .NET is that events are sometimes difficult to work out when to unhook EventHandler delegates from. Now with Rx, what you get when you subscribe to a IObservable<T> is an IDisposable, which I would argue is a much easier life cycle to deal with than an EventHandler. The other pretty beautiful thing with an IObservable<T> is that it is a full fledged object which you can pass around between methods, which means you could pass the IObservable<T> around anywhere in your code. OK, this could be done with EventHandler delegates, but it gets a lot messier, and using a IObservable<T> whenever you subscribe is where you will have the IDisposable, which makes it quite easy to deal with; just call Dispose() on the IDisposable subscription and you are done. Heck, you can even wrap it in a using statement, how nice is that? That I do like.

RxSimpleFormEvent (.NET 4.0)

Demo Description

This demo illustrates how we can use Rx to effectively use LINQ to Events, and react when we see the subscribed event change. It also shows how we can use the new System.Disposables namespace, and how we can scope a IDisposable subscription around the entire Windows Forms message pump.

This is a pretty simple demo that uses Rx to listen to MouseDown events on a form, and will then update a Label control on the form that the Rx is IObservable<T> subscribed to. Here is a screenshot of it in action, nothing fancy:

There is not much to speak of at all for the actual Form; here is the entire code for the Form (not including the designer generated code):

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;

namespace RxSimpleFromEvent
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }

        public string RxUpdateableLabelText
        {
            set
            {
                label3.Text = value;
            }
        }
    }
}

The real work for this demo actually happens in the standard WinForms project's Program.cs (C# Windows Forms Project) class. Here is the full code for the Program class:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Windows.Forms;

namespace RxSimpleFromEvent
{
    static class Program
    {
        /// <summary>
        /// The main entry point for the application.
        /// </summary>
        [STAThread]
        static void Main()
        {
            Application.EnableVisualStyles();
            Application.SetCompatibleTextRenderingDefault(false);

            Form1 frm = new Form1();

            IObservable<IEvent<MouseEventArgs>> moves =
                Observable.FromEvent<MouseEventArgs>(frm, "MouseMove")
                    .Where(x => x.Sender.GetType().Equals(typeof(Form1)));

            //subscribe, which gives us an IDisposable, that we can use in a using
            //statement, which will wrap the entire Application message pump, so 
            //will only Dispose when the applications main form (Form1 in this case)
            //is closed
            using (moves.Subscribe(

                //OnNext (Normal operation)
                evt => 
                { 
                    frm.RxUpdateableLabelText = evt.EventArgs.Location.ToString(); 
                })) 
            { 
                Application.Run(frm);

            }// moves is unsubsribed at this point, as its an IDisposable, neato


        }
    }
}

There are a couple of interesting things to note in this code section, such as:

  • See how we can effectively use standard LINQ syntax even though we are actually reacting to events. OK, in this case, I am simply observing events that originate from Form1, so it's not that fancy, but you get the idea. We are using LINQ syntax against events.
  • Since IObservable<T> (IObservable<IEvent<MouseEventArgs>> in this case) gives us an IDisposable, we can wrap a using statement around the entire application message pump using the IDisposable. Again, I know this is a weird example, but you could use that to automatically unhook from something that you know is going to be short lived using a somewhat smaller using scope in your own code.

RxAsyncWebCalls (.NET 4.0)

Demo Description

Another area where Rx claims to make our lives easier is in the area of asynchronous calls, where we would use the classic Begin/End asynchronous method calls in the past.

This demo illustrates how to use both conventional async calls and Rx to carry out asynchronous operations. In this example, it calls a small demo WCF service called "DemoWCFService" which contains a single method called "GetData(int value)" which should work fine if it is passed a number between 1-100 and throw a SOAP fault for anything else. This small demo is enough to show you the Asynchronous pattern that Rx can use, and how to handle faults that can occur using the IObservable<T>.OnCompleted and IObservable<T>.OnError() actions.

Here is what the UI looks like, where it can be seen that there are four buttons:

  1. Calling the WCF service using conventional async calls, passing a valid number to GetData(int value) which should work fine
  2. Calling the WCF service using conventional Async calls, passing an invalid number to GetData(int value) which should throw a SOAP fault
  3. Calling the WCF service using Rx, passing a valid number to GetData(int value) which should work fine
  4. Calling the WCF service using Rx, passing an invalid number to GetData(int value) which should throw a SOAP fault

Service Contract/Service

For completeness, here are the WCF service contracts:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.ServiceModel.Web;
using System.Text;

namespace DemoWCFService
{
    [ServiceContract]
    public interface IService1
    {
        [OperationContract]
        [FaultContract(typeof(GenericFault))]
        List<Person> GetData(int value);
    }

    [DataContract]
    public class Person
    {
        public Person(int age, string firstName, string lastName)
        {
            this.Age = age;
            this.FirstName = firstName;
            this.LastName = lastName;
        }

        [DataMember]
        public int Age { get; set; }

        [DataMember]
        public string FirstName { get; set; }

        [DataMember]
        public string LastName { get; set; }
    }


    [DataContract]
    public class GenericFault
    {
        public GenericFault(string errorMessage)
        {
            this.ErrorMessage = errorMessage;
        }

        [DataMember]
        public string ErrorMessage { get; set; }
    }
}

And the service implementation:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.ServiceModel.Web;
using System.Text;

namespace DemoWCFService
{
    public class Service1 : IService1
    {
        public List<Person> GetData(int value)
        {
            if (value > 100 || value < 1)
            {
                string errString = "Dummy error : Service1 can only return 1-100 items";
                GenericFault fault = new GenericFault(errString);
                throw new FaultException<GenericFault>(fault, new FaultReason(errString));
            }
            else
            {
                List<Person> people = new List<Person>();
                for (int i = 0; i < value; i++)
                {
                    people.Add(new Person(i, "person " + i.ToString() + "_FName",
                        "person " + i.ToString() + "_LName"));
                }
                return people;
            }
        }
    }
}

So as you can see, we have a bulk standard WCF service that simply returns a List<Person>, providing the GetData(int value) method is called with a number between 1-100; otherwise, a SOAP fault is thrown.

As I previously stated, I include both a good and bad call to the demo WCF service for both conventional async calls and Rx. The only thing that actually changes is the number that is sent to the WCF service's GetData(int value) method, so for brevity, I will only show an example of each of the conventional async calls and Rx based options. You can examine the full code in the attached demo code.

Conventional Calls

When we call a WCF Service or any Web Service for that matter, in an async manner, it is a pretty well established pattern that is tried and tested. Call the BeginXXX method, get a IAsyncResult object which can be used to call a EndXXX method, at which point you will get the results. For the demo app, this looks like this:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;

//our demo WCF reference
using RxAsyncWebCalls.DummyWCFService;
using System.Threading;
using System.ServiceModel;

namespace RxAsyncWebCalls
{
    public partial class Form1 : Form
    {
        private SynchronizationContext context;

        public Form1()
        {
            InitializeComponent();
            this.Load += Form1_Load;
        }

        void Form1_Load(object sender, EventArgs e)
        {
            //set up the SynchronizationContext
            context = SynchronizationContext.Current;
            if (context == null)
            {
                context = new SynchronizationContext();
            }
        }

        /// <summary>
        /// Shows calling a WCF service asynchronously using IAsyncResult
        /// Should be successful
        /// </summary>
        private void BtnStandardWCFCall_Click(object sender, EventArgs e)
        {
            Service1Client client = new Service1Client();
            try
            {
                pnlWait.Visible = true;
                Application.DoEvents();

                //conventional calling of WCF Async method
                client.BeginGetData(10,
                    (iar) =>
                    {
                        try
                        {
                            List<Person> people = client.EndGetData(iar);
                            context.Send(new SendOrPostCallback(delegate(object state)
                            {
                                dgvPeople.DataSource = (List<Person>)state;

                            }), people);
                        }
                        catch (FaultException<GenericFault> fex)
                        {
                            MessageBox.Show("Error\r\n" + fex.Detail.ErrorMessage);
                        }

                    }, null);

            }
            catch (Exception ex)
            {
                MessageBox.Show("Ooops Something wrong\r\n" + ex.Message + "\r\n\r\n" +
                    ex.StackTrace);
            }
            finally
            {
                if (client != null)
                {
                    client.Close();
                    client.Abort();
                    pnlWait.Visible = false;
                    Application.DoEvents();
                }
            }
        }
    }
}

Apart from the async call, all we are doing is handling a specific type of SOAP fault (WCF FaultException<T>) that the WCF Service throws and making sure the Form's controls are updated using the correct UI thread by using the standard Windows Forms SynchronizationContext. Apart from that, it's all pretty standard async code.

Rx Async Calls

Now here is what the Rx group have to say about conventional async calls:

The BeginXXX method is the method that starts a Web Service call. Besides taking all of the parameters to be passed to the web method, it also takes an AsynCallback delegate. This delegate gets invoked when the service's response has been received. This code is quite clumsy and the data aspect of the asynchronous call is not immediately apparent. Furthermore, composition with other asynchronous data sources (such as our TextBox) becomes quite hard. It's also not clear how one can cancel outstanding requests such that the callback procedure is guaranteed not to be called anymore. Dealing with error cases becomes hard too.

- Taken from Rx HOL .NET.pdf included in the Reactive Framework installation.

So with that said, I will now show you the Rx code, which is as follows:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;

//our demo WCF reference
using RxAsyncWebCalls.DummyWCFService;
using System.Threading;
using System.ServiceModel;

namespace RxAsyncWebCalls
{
    public partial class Form1 : Form
    {
        private SynchronizationContext context;

        public Form1()
        {
            InitializeComponent();
            this.Load += Form1_Load;
        }

        void Form1_Load(object sender, EventArgs e)
        {
            //set up the SynchronizationContext
            context = SynchronizationContext.Current;
            if (context == null)
            {
                context = new SynchronizationContext();
            }
        }

        /// <summary>
        /// Shows calling a WCF service asynchronously using RX
        /// Should be Success
        /// </summary>
        private void BtnAsyncUsingRx_Click(object sender, EventArgs e)
        {
            Service1Client client = new Service1Client();
            try
            {
                pnlWait.Visible = true;
                Application.DoEvents();

                Func<int,IObservable<List<Person>>> peopleObs = 
                    Observable.FromAsyncPattern<int, List<Person>>(
                        client.BeginGetData, client.EndGetData);

                //call WCF service, with something that should work ok
                IObservable<List<Person>> res = peopleObs(10);

                var subscription = res.Subscribe(
                    people =>
                    {
                        context.Send(new SendOrPostCallback(delegate(object state)
                        {
                            dgvPeople.DataSource = (List<Person>)state;

                        }), people);
                    },
                    //OnError (Error operation) -- should NOT happen
                    fault =>
                    {
                        FaultException<GenericFault> actualWCFFault = 
                            (FaultException<GenericFault>)fault;
                        MessageBox.Show("Error\r\n" + actualWCFFault.Detail.ErrorMessage);
                    },
                    //OnCompleted - Should happen, as no Fault happened
                    () =>
                    {
                        MessageBox.Show("Completed Ok");
                    });
            }
            catch (Exception ex)
            {
                MessageBox.Show("Ooops Something wrong\r\n" + ex.Message + "\r\n\r\n" +
                    ex.StackTrace);
            }
            finally
            {
                if (client != null)
                {
                    client.Close();
                    client.Abort();
                    pnlWait.Visible = false;
                    Application.DoEvents();
                }
            }
        }
    }
}

There are a couple of interesting things to note there, such as:

  • We can declare a Func<T> delegate to an IObservable<T> in a single line that includes the BeginXXX and EndXXX by using the Observable.FromAsyncPattern syntax. Which is actually pretty nice as we can simply call the Async Func<T> delegate we get from the Observable.FromAsyncPattern syntax as if it were a regular method. That said, we still need to subscribe to the result (a IObservable<T>) of calling this Func<T> delegate.
  • Since we are using Rx's async capabilities, we can make use of the OnError() and OnCompleted() (only called if using async Rx operations) actions that are allowed when you create a subscription of an IObservable<T>. We can see the OnCompleted() working in the original demo screenshot, but what about the OnError() action code? The screenshot below shows that, which is obviously the result of the WCF Service throwing a SOAP fault (FaultException<T>).

But is it actually easier to read/understand and work with? I happen to think not, but you can decide; playing with the latest tech is all the rage after all. I for one will still do it the old way, I think it's much more concise. Not all new tech is good tech, and I think people get carried away far too easily (myself included, but lately I am going back to my roots, and leaving the new tech to mature a bit more, just call me renaissance man).

MefRx (.NET 3.5)

Demo Description

When I started this article, I wanted it to have all .NET 4.0 projects, but it looks like some of the Rx stuff has been incorporated into .NET 4 and some has not, and since a lot of the stuff is in the System namespace, it is not just a question of fully qualifying types. So this is a strange project in that it references the Rx DLLs (as do the other projects), but it does not target .NET 4, but targets .NET 3.5.

This demo illustrates how we can use Rx to effectively use LINQ to Events, but this time, it also shows how you can use MEF to [Export] IObservable<EventResult> from an Rx object, which will be added to a MEF CompositionContainer and can be imported as any normal MEF [Import] would be. As the [Import]s are lazy, the interested party can get the [Import] using standard MEF Lazy loading techniques (normally Func delegates).

I admit this is an utterly useless bit of code the way it stands, but it could be useful if you had some sort of plotting libraries and you wanted to do something based on exporting the plot events or something like that. It's just fun to see if these things are possible, right? Never know when you might need to MEF and Rx.

So what does the demo do? Well, it pretty much just updates a Form Label whenever a MouseDown event occurs, but it uses MEF to Export and Import the MouseDown events.

Here is the obligatory screenshot:

This one needs a bit more explaining, and we will start with the MEF bits.

Start with a simple helper interface:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace MefRX
{
    public interface IExportableName
    {
        string ExportableName { get; }
    }
}

Then, we have a special MEF Export attribute:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ComponentModel.Composition;

namespace MefRX
{
    [MetadataAttribute]
    [AttributeUsage(AttributeTargets.Method, AllowMultiple = false)]
    public class ObservableExportAttribute : ExportAttribute
    {
        public ObservableExportAttribute() : 
            base(typeof(Func<object, IObservable<EventArgsWrapper>>)) { }
        public string ExportableName { get; set; }
    }
}

Then we have a simple singleton MEF CompositionContainer hosting class, where we simply host the Lazy Exports and create the actual CompositionContainer using the current Assembly. See how we are using MEF's Lazy support where we can use the Resolve() method to get the Export later when we really need it?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ComponentModel.Composition.Hosting;
using System.ComponentModel.Composition;

namespace MefRX
{
    public sealed  class SimpleComposer
    {
        #region Data

        static readonly SimpleComposer instance = new SimpleComposer();
 
        #endregion

        #region Ctor
        static SimpleComposer()
        { 
        }

        private SimpleComposer()
        {
        }
        #endregion

        #region Public Methods
        /// <summary>
        /// Resolve an exported function
        /// </summary>
        /// <param name="name"></param>
        /// <returns></returns>
        public static Func<object, IObservable<EventArgsWrapper>> Resolve(string name)
        {
            var rxSenders = Instance.RxEventExporters;

            if (rxSenders == null) throw new InvalidOperationException(
                "Part composer not initialized");
            try
            {
                return Instance.RxEventExporters.FirstOrDefault(
                    s => s.Metadata.ExportableName == name).Value;
            }
            catch (Exception ex)
            {
                return null;
            }
        }


        public void Compose()
        {
            var catalog = new AggregateCatalog();
            catalog.Catalogs.Add(new AssemblyCatalog(
                System.Reflection.Assembly.GetExecutingAssembly()));
            var container = new CompositionContainer(catalog);
            container.ComposeParts(Instance);

        }
        #endregion

        #region Public Properties

        [ImportMany]
        public Lazy<Func<object, IObservable<EventArgsWrapper>>, 
            IExportableName>[] RxEventExporters { get; set; }


        public static SimpleComposer Instance
        {
            get
            {
                return instance;
            }
        }
        #endregion
    }
}

The next thing to do is the Rx part, so the first thing we do is create the Export that MEF will find and put into the CompositionContainer, which is done as follows:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ComponentModel.Composition;
using System.Windows.Forms;

namespace MefRX
{
    [Export]
    public class ExportedTriggers
    {
        [ObservableExport(ExportableName = "MouseDown")]
        public IObservable<EventArgsWrapper> MouseDownExport(object obj)
        {
            var control = obj as Form1;
            var trigger = from kd in control.GetMouseDown()
                          select new EventArgsWrapper() 
                            { 
                                Sender = kd.Sender, 
                                EventArgs = kd.EventArgs, 
                            };
            return trigger;
        }
    }

    public static class FormExtensions
    {
        public static IObservable<IEvent<MouseEventArgs>> GetMouseDown(this Form frm)
        {
            var allMouseDowns = Observable.FromEvent<MouseEventHandler, MouseEventArgs>
                (h => new MouseEventHandler(h),
                    h => frm.MouseDown += h, //subscription handler
                    h => frm.MouseDown -= h  //unsubscribe handler
                 );

            
            return allMouseDowns;
        }
    }
}

Where there is a little helper class called EventArgsWrapper, which looks like this:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace MefRX
{
    public class EventArgsWrapper
    {
        public object Sender { get; set; }
        public object EventArgs { get; set; }
    }
}

So now, all we need to do is do some Importing using MEF. How is this done? Well, it is as shown below:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.ComponentModel.Composition;
using System.Threading;

namespace MefRX
{
    public partial class Form1 : Form
    {
        private IDisposable obsDisposable;

        public Form1()
        {
            InitializeComponent();
            this.FormClosing += Form1_FormClosing;
        }

        private void Form1_FormClosing(object sender, FormClosingEventArgs e)
        {
            obsDisposable.Dispose();
        }
       

        private void Form1_Load(object sender, EventArgs e)
        {
            IObservable<EventArgsWrapper> obs = 
                SimpleComposer.Resolve("MouseDown").Invoke(this);
            //subscribe on WinForms SynchronizationContext
            obsDisposable = obs.SubscribeOnWindowsForms(this).Subscribe(x =>
                {
                    MouseEventArgs o = (MouseEventArgs)x.EventArgs;
                    label1.Text = string.Format("x:{0}-y:{1}", o.X, o.Y);
                });
        }
    }
}

See how we use the SimpleComposer.Resolve() method that will query the Lazy Exports, looking for the correct Export using the metadata that we applied to the Rx IObservable<T> with the ObservableExportAttribute that I showed earlier. Another interesting thing to note is that we are observing on the Windows Forms SynchronizationContext. There is also one for WPF which uses the Dispatcher.

Oh, you can also see that I keep a reference to the subscription, and call Dispose() on that on the form's Closing event.

One last thing to note is that for all of this to work, we must create the MEF CompositionContainer, which we do in the Program class as follows:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Windows.Forms;

namespace MefRX
{
    static class Program
    {
        /// <summary>
        /// The main entry point for the application.
        /// </summary>
        [STAThread]
        static void Main()
        {
            Application.EnableVisualStyles();
            Application.SetCompatibleTextRenderingDefault(false);
            SimpleComposer.Instance.Compose();
            Application.Run(new Form1());
        }
    }
}

Now, credit where credit is due, I have based a lot of this particular MEF/Rx example code on a larger project that CodeProject user Anoop Madhusudanan did for his most tremendously excellent article: WPF Extensibility Hacks or WEX.

Great job Anoop, very very slick indeed. Hat off to you.

Further Reading

This article and the demo solution have concentrated on IObservable.Subscribe as that is one area that I think is majorly important, and one of the main reasons why one would use Rx in the first place. There is a whole bunch of other stuff that Rx is capable of doing, such as some functional programming techniques such as Fold/zip and also generating data sequences, and much much more.

For example, here is a screenshot of a portion of the extension methods available within Rx for IObservable<T>; as you can see, there is loads more there.

I would recommend that you do some more reading around this using the following links:

One reader also alerted me to this excellent tool called "RxSandBox" which you can get right here, which provides an excellent little UI to try Rx operations in; it really is pretty cool, and looks like this:

That's All For Now

That is all I wanted to say in this article. As always, I would just like to say that if you enjoyed this article and found it interesting, please take the time to give it a vote and perhaps leave a comment.

Thanks.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here