RxJS version:
6.0.0-beta.0
Code to reproduce:
import { from } from 'rxjs';
import { concat } from 'rxjs';
import { delay } from 'rxjs/operators';
let $from1 = from([1, 2]);
$from1
.pipe(
delay(2000),
concat(from([3, 4])),
delay(2000)
).subscribe(v => console.log(`$from1-delay-concat: ${v}`));
Expected behavior:
it should run without errors as in v5.5.6
Actual behavior:
/rxjs/node_modules/rxjs/internal/util/pipe.js:22
return fns.reduce(function (prev, fn) { return fn(prev); }, input);
^
TypeError: fn is not a function
at /rxjs/node_modules/rxjs/internal/util/pipe.js:22:56
at Array.reduce (<anonymous>)
at piped (/rxjs/node_modules/rxjs/internal/util/pipe.js:22:20)
at Observable.pipe (rxjs/node_modules/rxjs/internal/Observable.js:252:48)
at Object.<anonymous> (rxjs/lib/index.js:18:25)
at Module._compile (module.js:657:14)
at Object.Module._extensions..js (module.js:671:10)
at Module.load (module.js:573:32)
at tryModuleLoad (module.js:513:12)
at Function.Module._load (module.js:505:3)
Additional information:
Its not possible to investigate the issue/problem by my self because i guess the sourcemaps are broken! Could not set a breakpoint e.g. in rxjs/src/internal/util/pipe.ts. The browser jumps to a blank page (also not working after removing the .js.map and the location to the sourcemap in the related file)
I think that its my mistake! Do think im right when i say: to pass an Observable to a pipe is not possible? a pipe accepts only operators in v6?
Its look like that concat is no more a operator in v6! Its an Observable, right?
Could you clarify that new behavior? Why is that different in v6 generally? Thank you
I always suggest everyone to read here: https://github.com/ReactiveX/rxjs/blob/master/doc/pipeable-operators.md (at least until rxjs-docs is out)
To do what you want to do I would build my own operator, like so:
let $from1 = from([1, 2]);
$from1
.pipe(
delay(2000),
(yourSourceObservable) => concat(yourSourceObservable, from([3, 4])),
delay(2000)
).subscribe(v => console.log(`$from1-delay-concat: ${v}`));
// $from1-delay-concat: 1
// $from1-delay-concat: 2
// $from1-delay-concat: 3
// $from1-delay-concat: 4
To pull it as your own operator
const myGloriousOperator = () => (source) => concat(source, from([3,4]));
let $from1 = from([1, 2]);
$from1
.pipe(
delay(2000),
myGloriousOperator(),
delay(2000)
).subscribe(v => console.log(`$from1-delay-concat: ${v}`));
I made a stackblitz on here: https://stackblitz.com/edit/rxjs-issue-3447?file=index.ts
Looking at the previous source of concat, I can see the operator was intended to be used together with another observable, such as:
Observable1.concat(Observable2)
But the pipeable operators are meant to be a function that takes in an observable and returns another observable.
So maybe we should make a concat operator that works like so concat(from([3, 4]) and would apply concat(source, from([3,4])) on it under the covers
Having read that (https://github.com/ReactiveX/rxjs/blob/master/doc/pipeable-operators.md)
helped me a lot! Thanks for that.
Now i know why this isn't working anymore! The pipe operator in v6 accepts only function(s) as argument(s). It's solved totally elegantly (I believe, it's called closure/currying), one function provides another function and the other provides an observable. The closure/currying pattern was not clear at the first glance.
... and because i am not so familiar with functional programming, i could not interpret the error message immediately.
/rxjs/node_modules/rxjs/internal/util/pipe.js:22
return fns.reduce(function (prev, fn) { return fn(prev); }, input);
^
TypeError: fn is not a function
at /rxjs/node_modules/rxjs/internal/util/pipe.js:22:56
@claudiordgz Thank you so much, now everything is so clear for me!
I think everything is cleared so that this issue can be closed :)
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.
Most helpful comment
I always suggest everyone to read here: https://github.com/ReactiveX/rxjs/blob/master/doc/pipeable-operators.md (at least until rxjs-docs is out)
To do what you want to do I would build my own operator, like so:
To pull it as your own operator
I made a stackblitz on here: https://stackblitz.com/edit/rxjs-issue-3447?file=index.ts
Looking at the previous source of
concat, I can see the operator was intended to be used together with another observable, such as:Observable1.concat(Observable2)But the
pipeable operatorsare meant to be a function that takes in anobservableand returns anotherobservable.So maybe we should make a concat operator that works like so
concat(from([3, 4])and would applyconcat(source, from([3,4]))on it under the covers