async
and await
are, I think, the best thing to happen to Javascript,
though I’d very much like something like sweet.js so I can invent
it when I needed it most. However, I mostly see them being described in
terms of promises. Here, I show how you can implement CSP style channels using
them without using promises.
A basic channel
A basic channel can be thought of as a port that supports a post(value)
method, with the value being delivered to whichever end points are consuming
values from the channel. What we do is to model this “whichever end points”
as the await
points within async
functions.
The code below should be fairly self explanatory if you understand that
the semantics of an async function
are as though it returned a spawned
generator.
function channel() {
let queue = [], // We'll post a queue of values to the channel.
callbacks = []; // .. which will be consumed by being pushed
// to callbacks.
// Check if we have to push values to callbacks.
function pump() {
while (queue.length > 0 && callbacks.length > 0) {
setImmediate(callbacks.shift(), queue.shift());
}
}
return {
post: function (val) {
queue.push(val);
pump();
},
then: function (onSuccess, onFailure) {
// onSuccess and onFailure are continuations
// passed to us in `await` situations.
callbacks.push(onSuccess);
pump();
}
};
}
Here is a simple demo usage of channel which takes N values from the given channel, prints them out, then prints ‘done’.
async function take(chan, N) {
for (let i = 0; i < N; ++i) {
console.log(i, await chan);
}
console.log('done');
}
A sample (sanitized) transcript of a session follows -
> let chan = channel();
> take(chan, 5);
> chan.post('zero');
0 zero
> chan.post('one');
1 one
> chan.post('two');
2 two
> chan.post('three');
3 three
> chan.post('four');
4 four
done
Posting errors to a channel
With the previous simple approach, we’re guaranteed that both chan.post(val)
and await chan
won’t throw an error. If we want a producer to be able to
error out consumers so that consumer code can stay on the happy path within
a try-catch
block, we have to modify the channel definition a bit.
function channelWithErrorProp() {
let queue = [], // We'll post a queue of values to the channel.
callbacks = []; // .. which will be consumed by being pushed
// to callbacks or errors.
// Check if we have to push values to callbacks.
function pump() {
while (queue.length > 0 && callbacks.length > 0) {
let val = queue.shift();
// We check for Error values posted to the channel and
// send it to all the consumers when it occurs.
if (val instanceof Error) {
while (callbacks.length > 0) {
setImmediate(callbacks.shift().onFailure, val);
}
// Leave the error in the queue so that the channel
// always remains in the state of error once it
// occurs.
queue.unshift(val);
return;
}
setImmediate(callbacks.shift().onSuccess, val);
}
}
return {
post: function (val) {
queue.push(val);
pump();
},
then: function (onSuccess, onFailure) {
// onSuccess and onFailure are continuations
// passed to us in `await` situations.
callbacks.push({
onSuccess: onSuccess,
onFailure: onFailure
});
pump();
}
};
}
The same simple demo, but now with a try-catch
.
async function take(chan, N) {
try {
for (let i = 0; i < N; ++i) {
console.log(i, await chan);
}
console.log('done');
} catch (e) {
console.log('Whoopsie daisies!');
console.error(e);
}
}
Here is an error flow -
> let chan = channelWithErrorProp();
> take(chan, 3);
> chan.post('zero');
0 zero
> chan.post(new Error('meow'));
Whoopsie daisies!
Error: meow
at repl:1:11
at Script.runInThisContext (vm.js:65:33)
...
Back pressure
The above channel implementations aren’t complete, as they have unbounded
buffer accumulation and no back pressure support. They just serve to
illustrate a usage of async
/await
that I don’t see commonly talked about.
We can model back pressure by making chan.post(val)
itself be usable with an
await
, with the result of the await
being the value posted to the channel.
With this approach, we have a few choices for error propagation. Below, we
choose to propagate an error raised by a producer to all producers and
consumers except the producer injecting the error.
function channelWithBackPressure() {
let queue = [], // We'll post a queue of values to the channel.
callbacks = [], // .. which will be consumed by being pushed
// to callbacks or errors.
callfronts = []; // These get called when `post` is used with `await`.
// We use a "back channel" to hook into the producers waiting for
// their channel postings to go through.
let backChan = {
then: function (onSuccess, onFailure) {
// We'll get these continuations from the `await chan.push(val)`.
callfronts.push({
onSuccess: onSuccess,
onFailure: onFailure
});
// Compared to the previous implementation, this one moves
// the `pump()` call from the `post()` method to here, so that
// the maximum length of the queue is the maximum number of
// producers waiting for their data to go through.
pump();
}
};
// Check if we have to push values to callbacks.
function pump() {
while (queue.length > 0 && callbacks.length > 0 && callfronts.length > 0) {
let val = queue.shift();
if (val instanceof Error) {
// Send the error to all the waiters.
while (callbacks.length > 0) {
setImmediate(callbacks.shift().onFailure, val);
}
// Release all the producers when one of them posts
// an error. We succeed the first one that posted an error,
// and fail all the others with the same error, so that
// they know they can't post to the channel again.
setImmediate(callfronts.shift().onSuccess, val);
while (callfronts.length > 0) {
setImmediate(callfronts.shift().onFailure, val);
}
// Leave the error in the queue so that the channel
// always remains in the state of error once it
// occurs.
queue.unshift(val);
return;
}
setImmediate(callbacks.shift().onSuccess, val);
setImmediate(callfronts.shift().onSuccess, val);
}
}
let chan = {
post: function (val) {
queue.push(val);
// Returning backChan here without a `pump()` ensures that
// we capture the continuation at the `post` before launching forward.
return backChan;
},
then: function (onSuccess, onFailure) {
// onSuccess and onFailure are continuations
// passed to us in `await` situations.
callbacks.push({
onSuccess: onSuccess,
onFailure: onFailure
});
pump();
}
};
return chan;
}
With the above implementation of a channel with back pressure, you need to have
the discipline to call chan.push(val)
only with an await
- like await chan.push(val)
. If you don’t, then nothing will get pushed down the channel.
This is because when consumers are waiting on the channel, there is nothing to
pump, but if you call chan.push(val)
without await
ing, then the values
supplied won’t get pumped to the consumers. Vice versa, if the producers
act first, there will be no consumers to pump to, but until there are
consumers, the producers won’t get to continue.
Here is another silly example -
let chan = channelWithBackPressure();
async function ping() {
for (let i = 0; i < 5; ++i) {
console.log('pinging', i);
await chan.post({ping:i});
}
console.log('done pushing');
await chan.post(new Error('done'));
}
async function pong() {
try {
while (true) {
console.log(await chan);
}
} catch (e) {
console.log('done');
}
}
ping();
pong();
The above will produce something like -
pinging 0
{ ping: 0 }
pinging 1
{ ping: 1 }
pinging 2
{ ping: 2 }
pinging 3
{ ping: 3 }
pinging 4
{ ping: 4 }
done pushing
done
The order of the ping()
and pong()
calls doesn’t matter.
Other uses of this technique
The core of the technique is that we only need to return an object
with a then(onSuccess, onFailure)
method. A real Promise
is not
required. Below is a simple example of a fixed delay
.
function delay(ms) {
return {
then: function (onSuccess) {
setTimeout(onSuccess, ms, 'tick');
}
};
}
let oneSec = delay(1000);
async function countDown(n) {
for (let i = n; i > 0; --i) {
console.log(i);
await oneSec;
}
console.log('TADA!');
}
Have fun!
Disclaimer: The above is a concept sketch and is not extensively tested.