There's a bit of annoying impedance mismatch between working with subscriptions in GraphQL.js and working with subscriptions in Relay.
While GraphQL.js internally uses async iterables for subscriptions, Relay instead uses observables to handle everything.
The semantics between the two are similar, but not identical, and we end up with different-looking code on the receiver side v. on the publisher side.
Additionally, to the extent that there's a difference, observables are somewhat more powerful, as, analogous to the current value-or-promise handling in resolve, they synchronously respond, while async iterables can't.
I'm not sure if it's too late now, but should we consider supporting observables for subscriptions?
I don't think we should - GraphQL.js subscriptions use async iterables because it's an impending native syntactic JavaScript feature, while Observable seems unlikely to become native. Support for native syntax is going to be important for the long term sustainability of the feature.
Observables are probably a better fit for client-side observable programming where synchronous responses to observable events is critical (and that is exactly why Relay uses them) but Async Iterables are a better fit for natively asynchronous servers like a GraphQL subscription service.
For cases where the two need to come together, it would be preferable to have simple functions which convert from one to the other.
Thanks!
@leebyron @robzhu @Urigo @davidyaha @jedwards1211
cc @jquense
At this point, we've done quite a bit of extra work with subscriptions, playing around with async iterables. While async iterables have a number of nice syntax advantages, we've discovered them to be quite painful to use and full of pitfalls in this scenario.
Consider the simple case of adapting the messages on a Redis topic to an async iterable, then cleaning up the subscription when killing the iterable. This is actually surprisingly complicated to implement in a generically correct way!
First, we need to keep track of an internal queue _and_ promise to keep track of messages received. Second, we need to do our setup and teardown in very specific ways, because iterable.return() won't hit the finally block if it's called before the first message is received on the iterable (https://github.com/apollographql/graphql-subscriptions/issues/143).
In the end, an initial correct implementation looks like this: https://github.com/4Catalyzer/graphql-subscription-server/blob/v0.6.0/src/AsyncUtils.js#L27-L78. This was quite unintuitive to write, and there are a number of fiddly, seemingly-pointless pieces of code that, if removed, could lead to incorrect behavior.
It gets worse, though, because in the simple Redis case, a more correct way to model the subscription type is actually Promise<AsyncIterable>, as making (and receiving the acknowledgement for making) the initial subscription is itself asynchronous. However, because of the noted issue above with iterable cleanup, we need to write yet more unintuitive, fiddly code to get everything to work: https://github.com/4Catalyzer/graphql-subscription-server/pull/18#discussion_r199313379.
The upshot is that, as specified, iterable.return() on an async iterable has semantics that are so deeply unhelpful for this use case that it's very likely that most implementations of async iterables here are just wrong/buggy.
While there may not be a language-level specification for observables, they're at least not full of traps for the unwary that could lead to incorrect code. The observable-equivalent version of the code above would be significantly simpler, easier-to-understand, and less bug-prone.
To summarize, the core issues are:
The current situation might be okay if the expectation is that the vast majority of users will use some library that wraps up interacting with Redis or whatever other pub/sub system is applicable.
However, anybody writing this code themselves is very, very likely to run into subtle bugs, and _in fact right now_ the relatively popular graphql-subscriptions library _has_ these sorts of bugs: https://github.com/apollographql/graphql-subscriptions/issues/143.
If this sort of "reference-ish" implementation hits these pitfalls, then it's very likely that almost anybody else trying to use this API will hit the same pitfalls, or worse.
To me, that constitutes a prima facie case for not using an API that's so hard to get right.
@taion on the other hand, implementing transforms like map and combinations of async iterables using for await statements provides automatic finalization, whereas it would be more work to finalize with observables...
I'm also not exactly convinced that people wouldn't make the same mistake as https://github.com/apollographql/graphql-subscriptions/issues/143 if using observables.
@jedwards1211 Not exactly – with observables, because you control the cleanup explicitly, the error condition where the teardown function never gets called is much more difficult to write. As long as .unsubscribe() actually unsubscribes, there's nothing to worry about. By contrast, the try-catch pattern with async generators is much more error-prone.
Definitely you have to jump through some hoops to get events from an EventEmitter or redis into an async iterable. I agree that the risks are higher for async iterables there. But the risks are sometimes lower for transforming async iterables than for transforming observables:
export default async function * filter<T>(iterable: AsyncIterable<T>, predicate: (T) => any): AsyncIterable<T> {
for await (let value of iterable) {
if (await predicate(value)) yield value
}
// no try/catch or explicit cleanup required.
}
Compare that to how zen-observable implements a filter function. There are a lot more places a developer could make a mistake:
filter(fn) {
if (typeof fn !== 'function')
throw new TypeError(fn + ' is not a function');
let C = getSpecies(this);
return new C(observer => this.subscribe({
next(value) {
try { if (!fn(value)) return; }
// *** explicit cleanup ***:
catch (e) { return observer.error(e) }
observer.next(value);
},
// *** explicit cleanup ***:
error(e) { observer.error(e) },
complete() { observer.complete() },
}));
}
The error and complete functions are essentially boilerplate that a dev could forget to implement, as well as the try/catch block in next(value).
@taion I still think Lee Byron made the right call here. The main problem with observables is there a a dozen different libraries implementing them, so how do you choose one?
There's nothing that stops people from making and using a carefully-written observableToAsyncIterable function if they think it will make whatever they're doing more foolproof.
Coming back to this... this is even more aggressively broken than I thought, given https://github.com/tc39/proposal-async-iteration/issues/126.
It's simply almost impossible to safely use async generators in cases where we need cleanup. Consider https://babeljs.io/repl/#?babili=false&browsers=last%202%20versions&build=&builtIns=usage&spec=false&loose=false&code_lz=MYewdgzgLgBADgJxAWwJYQKYwLwzBgdxgAUk1MAKCgShwD4YBvAX2oG4AoDgQwgE8wwGADMAroKipwAKhEgQNJhxgwoCPkpUruBbqliIU6DJy0w-qDABsAJjACMpmMxGow3K1Y2NlW0JBArDAA6KxAAcwoAchtwDCj2X2YOZJ5-QRFxYElwGAAjbgRFHxV_aBEcOQVElWFg_AAPKBonMsCQsMiohAwoUQQwN3CEpzqevoGW3zag0Ijo8f78GwB-FZGUrgKi9iA&debug=true&forceAllTransforms=false&shippedProposals=false&circleciRepo=&evaluate=true&fileSize=false&sourceType=module&lineWrap=false&presets=stage-0%2Cstage-1&prettier=false&targets=&version=6.26.0&envVersion=1.6.2.
The "cleanup" for the generator simply won't get called until the promise resolves! Without cancellable promises, you can't interrupt a waiting async generator and make it clean up.
It seems nearly impossible at this point to elegantly write these without risking resource leaks all over the place.
Ouch, yeah that's a very good point, and something that absolutely has to be fixed. I didn't realize that, I thought f.return() or f.throw() would trigger the finally block. Looks like I was totally wrong about no explicit cleanup required in generators.
@taion to be clear, after reading through that thread, I realized this is a problem with async generators, not arbitrary async iterables, i.e. ones where you define the next(), return() and throw() methods yourself. With arbitrary async iterables, the call to your return() or throw() will not be blocked if next() returns a hanging promise (provided that the upstream code calls return() without waiting on next(), as in your example.
So one takeaway lesson is no one should be using async generators for cases like this for the time being.
So now I'm investigating how async iterables and observables compare for a value mapping transformation. I think this really illustrates how very similar the two are:
function mapAsyncIterable(iterable, transform) {
return {[Symbol.asyncIterator]() {
const iterator = iterable[Symbol.asyncIterator]()
return {
next: async () => await transform(await iterator.next()),
throw: err => iterator.throw(err),
return: () => iterator.return(),
}
}}
}
function mapObservable(observable, transform) {
return new Observable(observer => {
const subscription = observable.subscribe({
next: async value => observer.next(await transform(value)),
error: err => observer.error(err),
complete: () => observer.complete(),
})
return () => subscription.unsubscribe()
})
}
The issue is that this essentially becomes a minefield.
If any unsuspecting user accidentally passes the async iterable through an async generator, then cleanup may become arbitrarily delayed.
Yeah that's very true.
await Promise.race([observableProposal, cancelationProposal]) 😄
Most helpful comment
@leebyron @robzhu @Urigo @davidyaha @jedwards1211
cc @jquense
At this point, we've done quite a bit of extra work with subscriptions, playing around with async iterables. While async iterables have a number of nice syntax advantages, we've discovered them to be quite painful to use and full of pitfalls in this scenario.
Consider the simple case of adapting the messages on a Redis topic to an async iterable, then cleaning up the subscription when killing the iterable. This is actually surprisingly complicated to implement in a generically correct way!
First, we need to keep track of an internal queue _and_ promise to keep track of messages received. Second, we need to do our setup and teardown in very specific ways, because
iterable.return()won't hit thefinallyblock if it's called before the first message is received on the iterable (https://github.com/apollographql/graphql-subscriptions/issues/143).In the end, an initial correct implementation looks like this: https://github.com/4Catalyzer/graphql-subscription-server/blob/v0.6.0/src/AsyncUtils.js#L27-L78. This was quite unintuitive to write, and there are a number of fiddly, seemingly-pointless pieces of code that, if removed, could lead to incorrect behavior.
It gets worse, though, because in the simple Redis case, a more correct way to model the subscription type is actually
Promise<AsyncIterable>, as making (and receiving the acknowledgement for making) the initial subscription is itself asynchronous. However, because of the noted issue above with iterable cleanup, we need to write yet more unintuitive, fiddly code to get everything to work: https://github.com/4Catalyzer/graphql-subscription-server/pull/18#discussion_r199313379.The upshot is that, as specified,
iterable.return()on an async iterable has semantics that are so deeply unhelpful for this use case that it's very likely that most implementations of async iterables here are just wrong/buggy.While there may not be a language-level specification for observables, they're at least not full of traps for the unwary that could lead to incorrect code. The observable-equivalent version of the code above would be significantly simpler, easier-to-understand, and less bug-prone.