Channels with async and await

Apr 6, 2018   #CSP  #Javascript 

async and await are, I think, the best thing to happen to Javascript, though I’d very much like something like sweet.js so I can invent it when I needed it most. However, I mostly see them being described in terms of promises. Here, I show how you can implement CSP style channels using them without using promises.

A basic channel

A basic channel can be thought of as a port that supports a post(value) method, with the value being delivered to whichever end points are consuming values from the channel. What we do is to model this “whichever end points” as the await points within async functions.

The code below should be fairly self explanatory if you understand that the semantics of an async function are as though it returned a spawned generator.

function channel() {
    let queue = [],     // We'll post a queue of values to the channel.
        callbacks = []; // .. which will be consumed by being pushed
                        //    to callbacks.

    // Check if we have to push values to callbacks.
    function pump() {
        while (queue.length > 0 && callbacks.length > 0) {
            setImmediate(callbacks.shift(), queue.shift());
        }
    }

    return {
        post: function (val) {
            queue.push(val);
            pump();
        },
        then: function (onSuccess, onFailure) {
            // onSuccess and onFailure are continuations
            // passed to us in `await` situations.
            callbacks.push(onSuccess);
            pump();
        }
    };
}

Here is a simple demo usage of channel which takes N values from the given channel, prints them out, then prints ‘done’.

async function take(chan, N) {
    for (let i = 0; i < N; ++i) {
        console.log(i, await chan);
    }
    console.log('done');
}

A sample (sanitized) transcript of a session follows -

> let chan = channel();
> take(chan, 5);
> chan.post('zero');
0 zero
> chan.post('one');
1 one
> chan.post('two');
2 two
> chan.post('three');
3 three
> chan.post('four');
4 four
done

Posting errors to a channel

With the previous simple approach, we’re guaranteed that both chan.post(val) and await chan won’t throw an error. If we want a producer to be able to error out consumers so that consumer code can stay on the happy path within a try-catch block, we have to modify the channel definition a bit.

function channelWithErrorProp() {
    let queue = [],     // We'll post a queue of values to the channel.
        callbacks = []; // .. which will be consumed by being pushed
                        //    to callbacks or errors.

    // Check if we have to push values to callbacks.
    function pump() {
        while (queue.length > 0 && callbacks.length > 0) {
            let val = queue.shift();
            // We check for Error values posted to the channel and
            // send it to all the consumers when it occurs.
            if (val instanceof Error) {
                while (callbacks.length > 0) {
                    setImmediate(callbacks.shift().onFailure, val);
                }

                // Leave the error in the queue so that the channel
                // always remains in the state of error once it
                // occurs.
                queue.unshift(val);
                return;
            }
            
            setImmediate(callbacks.shift().onSuccess, val);
        }
    }

    return {
        post: function (val) {
            queue.push(val);
            pump();
        },
        then: function (onSuccess, onFailure) {
            // onSuccess and onFailure are continuations
            // passed to us in `await` situations.
            callbacks.push({
                onSuccess: onSuccess,
                onFailure: onFailure
            });
            pump();
        }
    };
}

The same simple demo, but now with a try-catch.

async function take(chan, N) {
    try {
        for (let i = 0; i < N; ++i) {
            console.log(i, await chan);
        }
        console.log('done');
    } catch (e) {
        console.log('Whoopsie daisies!');
        console.error(e);
    }
}

Here is an error flow -

> let chan = channelWithErrorProp();
> take(chan, 3);
> chan.post('zero');
0 zero
> chan.post(new Error('meow'));
Whoopsie daisies!
Error: meow
    at repl:1:11
    at Script.runInThisContext (vm.js:65:33)
    ...

Back pressure

The above channel implementations aren’t complete, as they have unbounded buffer accumulation and no back pressure support. They just serve to illustrate a usage of async/await that I don’t see commonly talked about.

We can model back pressure by making chan.post(val) itself be usable with an await, with the result of the await being the value posted to the channel. With this approach, we have a few choices for error propagation. Below, we choose to propagate an error raised by a producer to all producers and consumers except the producer injecting the error.

function channelWithBackPressure() {
    let queue = [],      // We'll post a queue of values to the channel.
        callbacks = [],  // .. which will be consumed by being pushed
                         //    to callbacks or errors.
        callfronts = []; // These get called when `post` is used with `await`.

    // We use a "back channel" to hook into the producers waiting for
    // their channel postings to go through.
    let backChan = {
        then: function (onSuccess, onFailure) {
            // We'll get these continuations from the `await chan.push(val)`.
            callfronts.push({
                onSuccess: onSuccess,
                onFailure: onFailure
            });

            // Compared to the previous implementation, this one moves
            // the `pump()` call from the `post()` method to here, so that
            // the maximum length of the queue is the maximum number of
            // producers waiting for their data to go through.
            pump();
        }
    };

    // Check if we have to push values to callbacks.
    function pump() {
        while (queue.length > 0 && callbacks.length > 0 && callfronts.length > 0) {
            let val = queue.shift();
            if (val instanceof Error) {
                // Send the error to all the waiters.
                while (callbacks.length > 0) {
                    setImmediate(callbacks.shift().onFailure, val);
                }

                // Release all the producers when one of them posts
                // an error. We succeed the first one that posted an error,
                // and fail all the others with the same error, so that
                // they know they can't post to the channel again.
                setImmediate(callfronts.shift().onSuccess, val);
                while (callfronts.length > 0) {
                    setImmediate(callfronts.shift().onFailure, val);
                }
                
                // Leave the error in the queue so that the channel
                // always remains in the state of error once it
                // occurs.
                queue.unshift(val);
                return;
            }
            
            setImmediate(callbacks.shift().onSuccess, val);
            setImmediate(callfronts.shift().onSuccess, val);
        }
    }

    let chan = {
        post: function (val) {
            queue.push(val);
            // Returning backChan here without a `pump()` ensures that
            // we capture the continuation at the `post` before launching forward.
            return backChan;
        },
        then: function (onSuccess, onFailure) {
            // onSuccess and onFailure are continuations
            // passed to us in `await` situations.
            callbacks.push({
                onSuccess: onSuccess,
                onFailure: onFailure
            });
            pump();
        }
    };

    return chan;
}

With the above implementation of a channel with back pressure, you need to have the discipline to call chan.push(val) only with an await - like await chan.push(val). If you don’t, then nothing will get pushed down the channel. This is because when consumers are waiting on the channel, there is nothing to pump, but if you call chan.push(val) without awaiting, then the values supplied won’t get pumped to the consumers. Vice versa, if the producers act first, there will be no consumers to pump to, but until there are consumers, the producers won’t get to continue.

Here is another silly example -

let chan = channelWithBackPressure();

async function ping() {
    for (let i = 0; i < 5; ++i) {
        console.log('pinging', i);
        await chan.post({ping:i});
    }
    console.log('done pushing');
    await chan.post(new Error('done'));
}

async function pong() {
    try {
        while (true) {
            console.log(await chan);
        }
    } catch (e) {
        console.log('done');
    }
}

ping();
pong();

The above will produce something like -

pinging 0
{ ping: 0 }
pinging 1
{ ping: 1 }
pinging 2
{ ping: 2 }
pinging 3
{ ping: 3 }
pinging 4
{ ping: 4 }
done pushing
done

The order of the ping() and pong() calls doesn’t matter.

Other uses of this technique

The core of the technique is that we only need to return an object with a then(onSuccess, onFailure) method. A real Promise is not required. Below is a simple example of a fixed delay.

function delay(ms) {
    return {
        then: function (onSuccess) {
            setTimeout(onSuccess, ms, 'tick');
        }
    };
}

let oneSec = delay(1000);

async function countDown(n) {
    for (let i = n; i > 0; --i) {
        console.log(i);
        await oneSec;
    }
    console.log('TADA!');
}

Have fun!

Disclaimer: The above is a concept sketch and is not extensively tested.