Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / All-Topics

Fiddling with RxJs Streams

5.00/5 (2 votes)
11 Jul 2016CPOL12 min read 10K  
Egon: Don’t cross the streams. Peter: Why? Egon: It would be bad. – Ghostbusters (1984) In this post I’ll touch on the emergence of JavaScript reactive streams. We’ll look at a cool online interactive tool that is useful for checking behavior of stream operators. I’ll finish by showing s

Egon: Don’t cross the streams.
Peter: Why?
Egon: It would be bad.

– Ghostbusters (1984)

In this post I’ll touch on the emergence of JavaScript reactive streams. We’ll look at a cool online interactive tool that is useful for checking behavior of stream operators. I’ll finish by showing six RxJs JSFiddle examples that you can run and modify.

Background

Rich applications, such as modern SPAs, deal in state changes due to events. We’re accustomed to having each interested event listener self-register a callback with an event emitter. That’s the observer pattern. This pattern is dynamic and more workable than hard-coding each event emitter to invoke foreign callbacks.

Nowadays a complex application may have a morass of cooperating event handlers that have intermittent life cycles. This is hard to reason about. It can lead to memory leaks from dangling listeners, intermittent navigation-instigated rogue behaviors, or a triangle-of-doom handler tangle in source code.

Promises mitigate many problems by flattening callback source. They are a uniform result container. I’ve written about promises. A settled promise eventually contains a future data result or an error value. That promise is immutable. It’s one-shot: valid for only a single event.

We often deal with successive sequences of events. Think of mouse position movement; it produces a stream of cooperating events that we may need to handle. Imagine implementing a drag-drop for example.

Reactive Stream

A reactive stream elevates the observer pattern to a uniform framework. It’s been called “The observer pattern done right.” It provides a pushed stream of items that we can transform, merge with other streams, or reduce to a single value. Functional programming techniques are in-play. Among them are reliance on first-class functions (passed as data), high-order functions (parameter functions; return value function), and lambdas (anonymous functions).

Event stream processing resembles the iterator pattern familiar in data streams. Events are divas, however. They occur unpredictably. Thus, event streams use a push model. Iterators use a pull model. Three nutshell concepts are:

  • Reactive (adjective – medieval Latin): responds to a stimulus in a particular manner.
  • Stream (noun – middle English): a steady succession (as of items or events).
  • Push versus pull – event streams versus data streams.

The following diagram shows a reactive stream pattern. An Observable wraps an item that is an event source. An Observable.subscribe triggers an intermittent stream of discrete items. It is the stream’s listener. An Observable can wrap almost any kind of data, including promises, or nothingness.

A set of well-known operators can tap into its output stream. Those are higher-order functions that often take a parameter lambda, object, or a primitive. Each operator takes a value from the stream, returning a value onto the stream. Together, they resemble a monadic chain process. An operator’s parameter function could internally start a new stream or combine two streams.

ReactiveJS Stream Patterns

ReactiveX

It’s difficult to roll your own reactive stream framework. There are several packages available across platforms and languages. I focused on ReactiveX. See http://reactivex.io/intro.html. The introduction explains it nicely. It’s an umbrella project that has a JavaScript binding named “Reactive Extensions,” or RxJs.

I used the RxJs framework in examples that follow. Refer to https://github.com/Reactive-Extensions/RxJS/blob/master/readme.md.
A side-benefit of ReactiveX technology is that it cleans up event listeners. Programmers that use traditional event listeners must be ever-mindful of disengaging unneeded listeners, or these will be memory leaks.

The reactive mode of programming could be difficult to ease into. The next section of this article shows an interactive reference tool that help to visualize various operator functions. Afterward, I’ll show six JsFiddle snippets that cover disparate use cases.

Marble Diagrams

Applying operators for transforming, filtering, and merging streams is a distinct way of reasoning about an event solution. For reference, try this API reference: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md.

Some of us can learn in a visually interactive manner. Look at the interactive visualization tool at http://rxmarbles.com/.

Want to see what really happens when you operate on streams? For example, a marble diagram of the merge operator allows you to drag event marbles of two input streams to instantly see the output stream. Read on …

Combining

Here’s the merge combining operator. The two topmost input streams apply merge operator to produce the bottom output stream.

ReactiveX merge combining operator

Below, I’ve repositioned the “40” and the “1” event marbles of the second input stream.

See the stable ordering of the bottom output stream afterward? Apparently, Egon, of Ghostbusters, never tried crossing reactive streams.

ReactiveX merge combining operator

Transforming

The well-known transformational map operator occurs in other functional libraries that process streams, collections, or plain old arrays. Recall each or forEach from lodash and underscore.

Additionally, there is the JS Array prototype’s forEach, or the Immutable.js map function. All, including our RxJs map operator, pass a collection or stream to a caller-supplied function, creating a new output sequence.

Here’s the map marble diagram. For each member of the input, its function parameter returns ten times its numeric input parameter. In the end, it transforms the input stream, producing a new stream consisting of each input multiplied by ten. It preserves ordering.

ReactiveX merge marble diagram

Below, I moved the “2” event marble to the extreme left to show that ordering remains.

ReactiveX merge marble diagram

Filtering

We’ll see a takeUntil filtering operator in a later drag-drop example. Look at that operator’s marble diagram. The second input stream conditionally filters the first stream’s members into the output stream. The second JS stream’s earliest false “0” item halts output. Note the vertical bar on the bottom output that shows the end of the output stream.

ReactiveX Filtering

Below, I’ve moved the rightmost middle stream “0” item leftward to just after the “2” item of the upper stream. The takeUntil stops replicating the upper stream there, now producing a result stream containing only “1” and “2”. Again RxJs preserved the ordering.

ReactiveX Filtering

JSFiddle Examples

Each example depends only upon the RxJs library and JQuery. I dropped back to ES5 syntax in the online JSFiddle code because Safari doesn’t support arrow functions. The following examples do use ES6 syntax. See https://babeljs.io/docs/learn-es2015/ for succinct examples of ES6 and ES2015.

Example 1: A Stream of Nothing

Recall that an Observable represents a push-based collection. In RxJS, an Observable can front any number of legal JavaScript entity or entities. Let’s lean on the boundary. How about an empty Observable?

Yes, but what use is a stream of nothing? Well, it’s an edge case. Consider a dynamic application that might need to handle a source of any number of indeterminate dynamic items, including an undefined JS value. We can handle that.

Reactive8

Reactive9

Refer to JSFiddle https://jsfiddle.net/mauget/3u4po86u/. It produces an Observable by mumbling an Rx.Observable.empty() incantation. Nothing happens until the Observable suffers a subscribe function call. An RxJs subscribe can take three optional functional parameters:

  • onNext
  • onError
  • onCompletion

The onError and onCompletion calls are mutually exclusive. The framework invokes one or the other, not both. For effect I used a window.setTimeout to delay invoking the subscribe until three seconds pass. When the timer pops, I call subscribe. The stream immediately awakens to push the “empty” item. The onCompletion logs a message and updates the DOM with the message.

const
  delay = 3, // seconds
  source = Rx.Observable.empty(), // Dormant until subscription occurs
  msg1 = `Will subscribe to empty source in ${delay} seconds ...
`; 

$('body').append(msg1);  // HTML output
console.log(msg1);       // deb log output  -

window.setTimeout( () => {
  const msg2 = `Subscribing ... here comes the data ...`;
  $('body').append(`
${msg2}`);
  console.log(msg2);

  // Kick the stream
  source.subscribe(
    v => console.log(`onNext: ${v}`),  // arg1 fn
    v => console.log(`onError: ${v}`), // arg2 fn
    v => {                           // arg3 fn
      const msg3 = `yes ... received the expected "${v}" from the stream.`;
      $('body').append(`
		${msg3}`);
      console.log(msg3);
    }
  );
}, delay * 1000 );

Example 2: Time Interval

Example 1 had a standard window.setTimer that provided a timed interval as a demo prop.

You object: “Isn’t a timer expiration an event? I thought RxJs is about observing events!” Yup. In that hello-world kind of snippet, I didn’t want to confuse a trivial stream example by involving a second stream.

Now you’re ready, Grasshopper! Let’s observe timer expirations via RxJs. An Observable stream of timer expirations will generate a dynamically increasing trip counter of values. Refer to the JSFiddle at https://jsfiddle.net/mauget/t03fcr8y/. It generated the before and after panels here.

Reactive91

Reactive92

Look at the code below. Notice the RxJs interval, skip, and take operators that sequentially process the stream input items into an output stream? Those govern the pace, content, and extent of the product stream.

The interval operator is the heart of the code. See https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/interval.md. This is a reworked example from RxJs samples. At each delayed invocation the interval call injects a numeric value from a monotonically-increasing zero-origin set.

The skip(0) operator omits the zeroth item. The take(10) limits processing to ten output members.

The subscribe call starts the stream. Its onNext functional parameter calculates a current odometer value from each emitted stream item. The onError and onCompletion functional parameters log their respective calls.

// distance = rate * time
const R = 60; // mph 
let t = 0; // We'll step up at intervals

const source = Rx.Observable
  .interval(500) // 1/2 second interval
  .skip(1) // Skip zeroth
  .take(10); // Take next 10

// Kick off ...
const subscription = source.subscribe(
  t => {
    console.log('hours: ' + t);
    $('#mph').html(R);
    $('#hours').html(t);
    $('#miles').html(R * t); // Do the math, display it
  },
  err => console.log('Error: ' + err),
  () => console.log('Completed'));

Example 3: Interactive Character / Word Counter

Let’s maintain a dynamic count of words and non-whitespace characters. They’re entered or pasted live into an HTML textarea element. Backspace or delete keys decrement the counts. I count whatever is held in the textarea at each event instant.

Reactive93

Refer to https://jsfiddle.net/mauget/o6rprdhx/. It’s another reworked RxJs sample. It defines an Observable of a DOM textarea keyup event. The Observable fronts a push-based collection consisting of a single object that holds the character count and word count of the textarea. The snippet generates the contents from a jQuery map function chained to an event selector. The map callback uses regular expressions to count current non-whitespace characters and words held within the textarea.

Nothing happens until the subscribe call on the Observable. The subscribe’s onNext function parameter populates the result, much like a traditional event callback. I omitted onError and onCompletion callback functions. A pushed Observable-to-subscribe event pulse occurs at each keyup. The onNext carries out an instant update of the two counts in the UI and console log.

// #A. DOM elements-of-interest from DOM query selectors
const
  $txtInput = $('#txtInput'),
  $charCount = $('#charCount'),
  $wordCount = $('#wordCount');

// #B. Observable of keyup triggered at textInput -- it's a monad
Rx.Observable.fromEvent($txtInput.get(0), 'keyup')
  .map(e => {
    return {
      cc: (e.target.value.match(/[^\s]/g) || []).length,
      wc: (e.target.value.match(/\b\S+\b/g) || []).length
    }
  })
  // #C. Kick off the stream
  .subscribe(v => {
    console.log(v);
    $charCount.html(` Chars: ${v.cc}`);
    $wordCount.html(` Words: ${v.wc}`);
  });

Example 4: Drag and Drop

Here is a simple drag-drop example. It uses cooperating Observables on the events: mousedown, mousemove, and mouseup. These collectively implement dragging and dropping a DOM item anywhere within a panel.

Reactive94

Reactive95

Refer to the JsFiddle at https://jsfiddle.net/mauget/9bhp0qu8/. Try dragging the fruit. This snippet, too, grew out of an RxJs example. It creates three kinds of mouse action observables. The code enables a user to reposition a draggable fruit image on a panel.

The subscribe to the mousedown Observable fruit starts the action. The event item on the fruit chains to a flatMap operator chained to a takeUntil mouseup operator. A flatMap is a common function encountered in functional programming. It’s a map function that flattens its input sequences into a single output sequence.

(function(dragable) {

  // Observe three major kinds of mouse events:
  // mousedown, mouseup on the dragable; mousemove over the document.
  const
    mousedown = Rx.Observable.fromEvent(dragable, 'mousedown'),
    mousemove = Rx.Observable.fromEvent(document, 'mousemove'),
    mouseup   = Rx.Observable.fromEvent(dragable, 'mouseup');

  // A. Detect mouse down; 
  // B. While down, listen for mouse move and mouse up;
  // C. When mouse up: stop listening for moves and mouse up.
  const mousedrag = mousedown.flatMap( md => {

    // Capture mouse down offset position
    const
      startX = md.offsetX,
      startY = md.offsetY;

    // B1. Track pos differentials using mousemove, until mouseup
    return mousemove.map( mm => {
      mm.preventDefault();

      return {
        left: mm.clientX - startX,
        top: mm.clientY - startY
      };
      // C. stop the drag when mousup
    }).takeUntil(mouseup);
  })

  // Subscribe to mousemove's mousedrag stream:
  // B2. Update draggable's position from mousedrag stream of events
  mousedrag.subscribe( pos => {
    dragable.style.top = pos.top + 'px';
    dragable.style.left = pos.left + 'px';
  });

})($('#dragable').get(0));

Example 5: AJAX Promise – A Set of Quantum-Random Numbers

Let’s display a set of profoundly random numbers requested from a REST service.

Reactive96

Refer to JSFiddle https://jsfiddle.net/mauget/b0jfmge6/. In this example, we access the random number REST service via AJAX. We can make an Observable for almost anything. We’ll observe a promise. An AJAX call can return a promise to provide asynchronous behavior that we need to drive a UI from a latent network response. The stream will push the promise.

To review, a promise is either pending or settled. If it settles to a success state, it forever returns the data it contains from that success. Thus you can call then on that promise whenever you desire, as often as you desire, but it will always return its settled success or reject value for its lifespan.

Our example observes a button click. The click Observable, onNext, displays a working banner. A chained flatMap operator calls getNumbers to create a second Observable from a promise. That promise is an immediate result of an AJAX call to the REST service. This is easier to understand by looking at the snippet.

The flatMap operator gates the settled then object payload of the promise Observable onto the button click stream as a flat sequence of numbers. A subscribe on the button click stream displays the random numbers.

I kick off activity by triggering a refresh button click at the bottom of the code. Notice what happens if you click the button repeatedly? You see the identical random number sequence repainted!

I claimed the sequence is random, so why does it repeat? Answer: because the promise stream already resolved that settled one-shot AJAX promise. There is no second network invocation.

const $button = $('#button'),
  $result = $('#result');

// AJAX fn: returns observable promise
const getNumbers = count => {
  return Rx.Observable.fromPromise(
    // Get promise of: {"type":"uint8","length":1,"data":[99],"success":true}
    $.ajax({
      url: `https://qrng.anu.edu.au/API/jsonI.php?length=${count}&type=uint8`
    })
    .promise());
};

// Observe button click, subscribe to result
Rx.Observable.fromEvent($button.get(0), 'click')
  .map(e => {
    $result.html('<span class="working">(working)</span>');
    return e.target.value;
  }).flatMap(getNumbers(300))
  .subscribe(d => {
    let sep = '';
    console.log(d);
    if (d.success) {
      $result.empty();
      d.data.forEach(v => {
        $result.append(sep + v);
        sep = ', ';
      });
    }
  });

$button.trigger('click');
Sidebar

As a physics major diverted to computer programming, I’m compelled to speak about our data.

The numbers originate from a vacuum chamber in Canberra, Australia. A vacuum that emits random numbers! The count of particles and photons is zero in a perfect dark classical vacuum. At the quantum level, however, a vacuum is weird. A lab at the Australian National University measures field variations from continually materializing and dematerializing virtual particle pairs. See https://en.wikipedia.org/wiki/Virtual_particle.

A REST service streams those values. Statistical tests show that the sequence is profoundly random.

Example 6: AJAX Promise – A Quilt of Quantum-Random Colors

My wife is a quilter. I couldn’t resist converting example 5 output into a random patch quilt.

Reactive97

Refer to JSFiddle https://jsfiddle.net/mauget/9c2ecLqs/. The code is largely that of example 5, aside from presentation logic in the subscribe call. The subscribe callback forces each sequence item into a number of the interval 0 … 255. It combines each group of three into one CSS RGB value. Finally, it emits each RGB value into a grid. Again, notice that the refresh button causes an identical quilt to render after the first rendering. That settled promise remains immutable and happy.

const valCount = 1020;
const $button = $('#button'),
      $result = $('#result');
      
// AJAX fn: returns observable promise
const getNumbers = count => {
  return Rx.Observable.fromPromise(
    // Get promise of: {"type":"uint8","length":1,"data":[99],"success":true}
    $.ajax({
      url: `https://qrng.anu.edu.au/API/jsonI.php?length=${count}&type=uint8`})
      	.promise());
};

// Listen for button clicks, do service request, subscribe to result
Rx.Observable.fromEvent($button.get(0), 'click')
  .map(e => {
  $result.html('<span class="working">(working)</span>');
  return e.target.value;
})
  .flatMap( getNumbers( valCount ) )
  .subscribe(data => {
  	console.log(data);
  	if (data.success) {
   	 $result.empty();
    	let cols = 0
    	const c = []; // rgb value

   	 data.data.forEach( v => {
      c.push( v % 256 );
      if (c.length > 2 ) {
        const rgb = `rgb(${c.pop()},${c.pop()},${c.pop()})`;
        $result.append(
`<span style="background-color:${rgb}">&nbsp;&nbsp;&nbsp;&nbsp;</span>`);

        if ( ++cols > 33 ) {
          $result.append(`
`);
          cols = 0;
        }
      }
    });
  }
});

$button.trigger('click');

Wrap-Up

Reactive streams are a unified way of dealing with asynchronous events. They’re a higher-level implementation of the observer pattern. They rely on functional programming. An Observable pushes events onto a stream. Operators modify it. A subscribe operator starts, handles, and ends stream processing.

This paradigm may require a shift of your thinking for programming in an asynchronous environment. Marble diagrams help to understand many functional operators used in ReactiveX.

We saw six operational RxJs snippets that support varying use cases. You could run and modify them on JSFiddle for familiarization. Reactive streaming libraries exist across platforms and operating systems. ReactiveX is fairly unified across the board, so concepts from this article could apply beyond JavaScript.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)