https://github.com/ReactiveX/rxjs/issues/1980 was closed so opening a new one.
Currently from() is typed to accept any object with a subscribe method. But at runtime it actually throws if the object does not have a Symbol.observable property. The types or implementation should be corrected.
Personally I prefer using Subscribable because Symbol.observable is not even standard and it is a global property. There can be nasty bugs if polyfills are loaded wrong (e.g. Rx is loaded first, then something imports symbol-observable which will mutate the global Symbol.observable and then make Rx's symbol checks fail because it already set it to "@@observable" or the Observable was created before (I've already lost hours debugging real bugs caused from this).
Hi there!
I faced with the same problem and was surprised to see runtime error: TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
RxJS typings definitely support Subscribable<T> interface: ObservableInput implies SubscribableOrPromise, which implies Subscribable.
I looked at the source code to understand what was the reason of rejecting my entity implementing Subscribable<T>. This condition doesn't consider "fromSubscribable" case which is technically similar to fromObservable -> subscribeToObservable. Here we just check the presence of .subscribe method which is all we need for fromSubscribable -> subscribeToSubscribable analogue.
Maybe me or someone will offer a solution by providing pull request? This feature will be very useful for me in some cases as I can replace my subscription helper with just from. I don't really want to mess with Symbol.observable :) IMO implementing such interface is a bit bulky and has to be done both on observable and observing sides.
Thank you.
@bloadvenro go ahead :)
@felixfbecker
I edited my post to add some argumentation about Symbol.observable interface. Want to hear some thoughts from community about this topic. Addition looks harmless and fits naturally to the from utility.
To reiterate, I agree with you and would encourage you to open a PR
I'm facing the same issue trying to deserialize the Subscribable calling from(subscribable). The used serializer is capable to handle functions (see more in https://www.electronjs.org/docs/api/context-bridge) and according to the Subscribable interface that should work but it turns out that the Subscribable interface doesn't represent the actual logic which checks for Symbol.observable prop in runtime. So right now I have to explicitly wrap pure subscribe object into Observable by running similar code:
// prop intentionally named as "subscribeLike" but not "subscribe"
// so you can't accidentally do "from(SubscribeLike)" like you can do using
// regular "Subscribable" and get runtime error due to the "Subscribable" inconsistency
export type SubscribeLike<T> = NoExtraProperties<{
subscribeLike: Subscribable<T>["subscribe"]
}>;
export function subscribableLikeToObservable<T>(
input: SubscribeLike<T>,
): Observable<T> {
return new Observable<T>((subscriber) => {
return input.subscribeLike(
(value) => subscriber.next(value),
(error) => subscriber.error(error),
() => subscriber.complete(),
);
})
}
But would love from(subscribable) works on pure {subscribe: () => {...}} object that doesn't have Symbol.observable prop defined in it. Or alternatively, the worse option, the Subscribable interface gets Symbol.observable described in it, so TypeScript won't allow you to call from(subscribable) if subscribable is a pure {subscribe: () => {...}}-like object without Symbol.observable prop set.
@cartant I saw you commenting on https://github.com/ReactiveX/rxjs/issues/5066#issuecomment-586541767 and would love to discuss this more. Are you arguing in your comment towards improving the ergonomics of Subscribable, or to remove it?
In my experience the Symbol ergonomics are even worse, because there is no standard global Symbol.observable, you can get into nasty version conflicts, and symbols cannot be easily cloned over IPC/workers (while a Subscribable can be very easily proxied in a generic way with e.g. comlink).
@felixfbecker see https://github.com/cartant/rxjs-interop/blob/master/README.md and the blog post linked therein.
Thanks for the link, but that doesn't seem to help with the case of IPC/workers. Things would be a lot easier if instead of checking for Symbol.observable Rx just checked if typeof obj.subscribe === 'function' (as the type signatures imply, but is not the case). It's possible of course that subscribe is a different method but that would be caught by TypeScript (and it hasn't really been a problem for Promises).
@felixfbecker ATM, Symbol.observable is the interop point, and the patch function in rxjs-interop can be used to ensure that interop works without having to mess around with polyfilling Symbol.observable. You should be able to do something like this:
import { patch } from "rxjs-interop";
const foreignSource: Observable<Whatever> = /* some foreign source */;
foreignSource[Symbol.observable] = () => foreignSource;
patch(foreignSource);
// foreignSource will now be seen as an observable input by RxJS regardless of
// whether or not Symbol.observable was polyfilled by the application.
Whether or not the API is relaxed regarding inputs having to implement Symbol.observable or just implement subscribe, the library will still have to support Symbol.observable shenanigans. And, TBH, the interop business is unlikely to be a priority.
Would the core team accept a PR to bring the implementation in line with the types? I.e. not throwing on Subscribables
It really depends upon the other issue upon which you commented: the subscribe signature. If the three-arg, non-observer signature is deprecated, this turns into an even bigger mess, IMO. If the deprecation is rescinded - which would be my choice - I guess I would be okay with it. I'd have to give it more thought.
I think the real resolution to this, short term, is to remove "Subscribable" from ObservableInput. The reason for this is we might end up moving toward using AbortSignal in the subscribe signature. I'd hate to start supporting something externally and have to pivot away from it soon after.
Long term: I think we should support subscribe(observer)... Just unwilling to do it right now.
Most helpful comment
Hi there!
I faced with the same problem and was surprised to see runtime error:
TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.RxJS typings definitely support
Subscribable<T>interface: ObservableInput implies SubscribableOrPromise, which implies Subscribable.I looked at the source code to understand what was the reason of rejecting my entity implementing
Subscribable<T>. This condition doesn't consider "fromSubscribable" case which is technically similar to fromObservable -> subscribeToObservable. Here we just check the presence of.subscribemethod which is all we need forfromSubscribable -> subscribeToSubscribableanalogue.Maybe me or someone will offer a solution by providing pull request? This feature will be very useful for me in some cases as I can replace my subscription helper with just
from. I don't really want to mess withSymbol.observable:) IMO implementing such interface is a bit bulky and has to be done both on observable and observing sides.Thank you.