RxJs

What is RxJs? I tend to think of it as a library that allows you to treat asynchronous data (ajax stuff, mouse clicks, what have you) as an time-delayed array, one that you can do all your fancy functional stuff with. Stuff like map, filter, and reduce.

Now, Reactive programming is kind of a big subject, and there's no way I could cover it all in one post, but I can get you started and point you to some resources.

Let's start with a simple button example.

Here's the pug template for a button:

button(id='click-me', type='button') Click Me!  

And here's the js that'll do something with it:

Rx.Observable.fromEvent(  
    document.querySelectorAll('#click-me'),
    'click')
        .subscribe(e => console.log(`You clicked the "${e.target.innerHTML}" button!!`))

Rx.Observable.fromEvent takes events like button clicks or keyboard input and turns it into a stream of values, basically an array of events. Since it's array you can do all your favorite operations on it like map, filter, reduce, and so on.

The subscribe function takes a function that will actually do something with that stream. Here we're just printing to the console.

Ok, let's do something a little more complicated.

Here's a row of buttons:

    table
        tr
            each letter in ['x', 'y', 'z']
                button(class='letters', type='button')=letter

each is just your basic for-loop in the pug templating language. This code will produce a simple set of 3 buttons with the values 'x', 'y', and 'z' respectively. They'll all have the class letters which we can use to create a stream of button presses.

Here's the code that does something with these buttons as well as keyboard presses:

const letterKeys =  
    Rx.Observable.fromEvent(document, 'keypress')
        .map(R.prop('key'))
        .filter(R.pipe(R.match(/^[xyz]$/), R.length))

Rx.Observable.fromEvent(  
    document.querySelectorAll('.letters'),
    'click')
        .map(R.path(['target', 'innerHTML']))
        .merge(letterKeys)
        .scan(R.concat, '')
        .subscribe(s => document.querySelector('#letter-screen').innerHTML = s)

Here we've got two streams. letterKeys is a stream of key presses. Since a stream is just like an array of values that we get over time, we can map over them just like any array.

map(R.prop('key')) uses ramda to grab the value of the key press.

.filter(R.pipe(R.match(/^[xyz]$/), R.length)) filters out all key presses except 'x', 'y', and 'z'.

Next, we get a stream of values from clicking our three xyz buttons and use map to grab their values, creating a stream of characters exactly like letterKeys. Then we can use merge to merge these two streams together.

scan is exactly like an array's reduce method. It takes an initial value, then uses a function to combine each element of a list to combine them into another value, the accumulator. Here we just use R.concat to concat the stream of characters into a single string.

Every time a new character comes down the stream scan will spit out it's current accumulator value.

The initial value for our scan is the empty string. Then say we press 'x'. The new accumulator value will be 'x'. If we press 'x' again the accumulator will be 'xx', then 'xxy' if we press y and so on.

The subscribe callback will use this stream to print the current accumulator value to the page by modifying #letter-screen's inner html:

div(id='letter-screen')  

Here's a more complicated example, a calculator:

    div(id='calc-screen') 0
    table
        each nums in [[7, 8, 9, '+'], [4, 5, 6, '-'], [1, 2, 3, '*'], [0, '.', 'c', '/']]
            tr
                each num in nums
                    td
                        button(class='numpad', type='button')=num
    button(class='numpad', type='button') =
    button(class='numpad', type='button') +/-

The above code spits out a standard calculator layout.

Here's part of the JavaScript that does something with it:

const calculatorKeys =  
    Rx.Observable.fromEvent(document, 'keypress')
        .map(R.prop('key'))
        .filter(R.pipe(R.match(/^[\*\/\+-\dcp=]$/), R.length))

Rx.Observable.fromEvent(  
    document.querySelectorAll('.numpad'),
    'click')
        .map(R.path(['target', 'innerHTML']))
        .map(button => button === '+/-' ? 'p' : button)
        .merge(calculatorKeys)
        .scan(calculator, calculatorSeed)
        .map(R.prop('value'))
        .subscribe(n => document.querySelector('#calc-screen').innerHTML = n)

This is pretty much just like the xyz buttons example. We merge streams of button clicks and key presses into a string of characters that will be used by scan to do our application logic.

The only difference between this example and the xyz example is complexity. It really has the same basic structure: get a stream of button clicks and key presses, turn it into a single stream of characters, and send it to scan and a reducer function. map(R.prop('value')) just grabs the value we want to show on the calculator screen, rather than the entire application state.

Here's what calculatorSeed looks like:

const calculatorSeed = {  
    prevValue: '0',
    value: '0',
    operation: second,
    newNumber: true
}

It's just the calculator state. I use calculator to save the previous number in prevValue and the current number in value. operation saves the current operation: plus, minus, divide, or whatever. newNumber is just a flag that keeps track of whether we're starting to type a new number or not.

second is a do-nothing operation. It takes prevValue and value and returns value. I use it for the equals button, since that shouldn't do any actual calculations.

Here's the code for calculator, our reducer:

const calculator = (acc, button) => {  
    switch(button) {
        case '+':
            return operateLoadNextOperator(acc, R.add)
        case '-':
            return operateLoadNextOperator(acc, R.subtract)
        case '*':
            return operateLoadNextOperator(acc, R.multiply)
        case '/':
            return operateLoadNextOperator(acc, R.divide)
        case '=':
            return operateLoadNextOperator(acc, second)
        case 'c':
            return calculatorSeed
        case 'p':
            return {
                ...acc,
                value: toggleNegative(acc.value)
            }
        default:
            return {
                ...acc,
                newNumber: false,
                prevValue: acc.newNumber ? acc.value : acc.prevValue,
                value: updateValue(button, acc)
            }
    }
}

operateLoadNextOperator performs the current calculation, puts the result in value, the previous number in prevValue, adds the next operator (whatever operator was just pressed) and sets newNumber to true.

The default case handles the actual input of numbers.

It's not super important that you understand all this. I included this example for two reasons:

First, I want to demonstrate that using scan on a stream is a simple and powerful way to write functional application logic, no matter how complicated your app is.

Second, I spent a lot of time making this example (more time that I thought it would need to) and I'd be damned if I wasn't going to show off at least a little of that work! :P

Ok, let's finish this post with an ajax example. Here's the pug template:

    p(id='story-screen')
    button(class='story', type='button') <<
    button(class='story', type='button') <
    button(class='story', type='button') <>
    button(class='story', type='button') >
    button(class='story', type='button') >>

Here's the code that handles the stream of button presses:

Rx.Observable.fromEvent(  
    document.querySelectorAll('.story'),
    'click')
        .map(R.path(['target', 'innerText']))
        .scan(buttonFunc, min)
        .startWith(min)
        .map(pageNumber => `/story/${pageNumber}`)
        .flatMap(url => Rx.Observable.fromPromise(fetch(url)))
        .flatMap(response => response.json())
        .map(R.prop('text'))
        .subscribe(text => document.querySelector('#story-screen').innerHTML = text)

What's going on here? Well, I've got an api call, /story/${pageNumber} that returns a page from a short story. There are 6 pages in total, so I've set min to 1 and max to 6.

I want to use these buttons: <<, <, <>, >, >> to go to the first page, the previous page, the first page, the next page, and the last page respectively.

I use the buttonFunc reducer to turn the stream of button presses into a stream of numbers, each between 1 and 6 inclusive.

I use startwith to kickstart that stream of numbers with the number 1 so we start at the beginning of the story.

Then I use map to turn the stream of numbers into a stream of urls: /story/1, /story/2, etc.

Next, if I just did a simple map of fetching the url using the fetch api I'd get a stream of promises. That's no good so I create a stream from that promise using Rx.Observable.fromPromise. Problem is, then I'd get a stream of streams, which is a bit like an array of arrays.

If I had some similar code with arrays (an array with arrays) like this:

console.log(  
    R.map(x => [x, x], [1, 2, 3]))
// prints [ [ 1, 1 ], [ 2, 2 ], [ 3, 3 ] ]

And what I really wanted as a single flatted array, I could do this:

console.log(  
    R.chain(x => [x, x], [1, 2, 3]))
// prints [ 1, 1, 2, 2, 3, 3 ]

Here chain in ramda is basically the same as flatMap in RxJs. Both are un-boxers. They unbox stuff like arrays.

What's neat is that flatMap doesn't work with just streams. Check out the next call:

.flatMap(response => response.json())

Here, flatMap has no problem unboxing promises as well as streams. We don't even really need to create a stream from the promise:

        .flatMap(url => fetch(url))
        .flatMap(response => response.json())

Finally our little story program grabs the page text and displays it.

Like I said, there's a whole lot more to learn here but I hope this gets you started. To learn more, you can head over to the RxJs documentation, have a look at this tutorial, or this egghead video series.

Looking for a software developer?