Rxjs: pipe operator does not work with v6 as before v5

Created on 17 Mar 2018  路  6Comments  路  Source: ReactiveX/rxjs

RxJS version:
6.0.0-beta.0

Code to reproduce:

import { from } from 'rxjs';
import { concat } from 'rxjs';
import { delay } from 'rxjs/operators';

let $from1 = from([1, 2]);
$from1
    .pipe(
        delay(2000),
        concat(from([3, 4])),
        delay(2000)
    ).subscribe(v => console.log(`$from1-delay-concat: ${v}`));

Expected behavior:
it should run without errors as in v5.5.6

Actual behavior:

/rxjs/node_modules/rxjs/internal/util/pipe.js:22
        return fns.reduce(function (prev, fn) { return fn(prev); }, input);
                                                       ^

TypeError: fn is not a function
    at /rxjs/node_modules/rxjs/internal/util/pipe.js:22:56
    at Array.reduce (<anonymous>)
    at piped (/rxjs/node_modules/rxjs/internal/util/pipe.js:22:20)
    at Observable.pipe (rxjs/node_modules/rxjs/internal/Observable.js:252:48)
    at Object.<anonymous> (rxjs/lib/index.js:18:25)
    at Module._compile (module.js:657:14)
    at Object.Module._extensions..js (module.js:671:10)
    at Module.load (module.js:573:32)
    at tryModuleLoad (module.js:513:12)
    at Function.Module._load (module.js:505:3)

Additional information:
Its not possible to investigate the issue/problem by my self because i guess the sourcemaps are broken! Could not set a breakpoint e.g. in rxjs/src/internal/util/pipe.ts. The browser jumps to a blank page (also not working after removing the .js.map and the location to the sourcemap in the related file)

Most helpful comment

I always suggest everyone to read here: https://github.com/ReactiveX/rxjs/blob/master/doc/pipeable-operators.md (at least until rxjs-docs is out)

To do what you want to do I would build my own operator, like so:

let $from1 = from([1, 2]);
$from1
    .pipe(
        delay(2000),
        (yourSourceObservable) => concat(yourSourceObservable, from([3, 4])),
        delay(2000)
    ).subscribe(v => console.log(`$from1-delay-concat: ${v}`));
// $from1-delay-concat: 1
// $from1-delay-concat: 2
// $from1-delay-concat: 3
// $from1-delay-concat: 4

To pull it as your own operator

const myGloriousOperator = () => (source) => concat(source, from([3,4]));
let $from1 = from([1, 2]);
$from1
    .pipe(
        delay(2000),
        myGloriousOperator(),
        delay(2000)
    ).subscribe(v => console.log(`$from1-delay-concat: ${v}`));

I made a stackblitz on here: https://stackblitz.com/edit/rxjs-issue-3447?file=index.ts

Looking at the previous source of concat, I can see the operator was intended to be used together with another observable, such as:

Observable1.concat(Observable2)

But the pipeable operators are meant to be a function that takes in an observable and returns another observable.

So maybe we should make a concat operator that works like so concat(from([3, 4]) and would apply concat(source, from([3,4])) on it under the covers

All 6 comments

I think that its my mistake! Do think im right when i say: to pass an Observable to a pipe is not possible? a pipe accepts only operators in v6?

Its look like that concat is no more a operator in v6! Its an Observable, right?

Could you clarify that new behavior? Why is that different in v6 generally? Thank you

I always suggest everyone to read here: https://github.com/ReactiveX/rxjs/blob/master/doc/pipeable-operators.md (at least until rxjs-docs is out)

To do what you want to do I would build my own operator, like so:

let $from1 = from([1, 2]);
$from1
    .pipe(
        delay(2000),
        (yourSourceObservable) => concat(yourSourceObservable, from([3, 4])),
        delay(2000)
    ).subscribe(v => console.log(`$from1-delay-concat: ${v}`));
// $from1-delay-concat: 1
// $from1-delay-concat: 2
// $from1-delay-concat: 3
// $from1-delay-concat: 4

To pull it as your own operator

const myGloriousOperator = () => (source) => concat(source, from([3,4]));
let $from1 = from([1, 2]);
$from1
    .pipe(
        delay(2000),
        myGloriousOperator(),
        delay(2000)
    ).subscribe(v => console.log(`$from1-delay-concat: ${v}`));

I made a stackblitz on here: https://stackblitz.com/edit/rxjs-issue-3447?file=index.ts

Looking at the previous source of concat, I can see the operator was intended to be used together with another observable, such as:

Observable1.concat(Observable2)

But the pipeable operators are meant to be a function that takes in an observable and returns another observable.

So maybe we should make a concat operator that works like so concat(from([3, 4]) and would apply concat(source, from([3,4])) on it under the covers

Having read that (https://github.com/ReactiveX/rxjs/blob/master/doc/pipeable-operators.md)
helped me a lot! Thanks for that.

Now i know why this isn't working anymore! The pipe operator in v6 accepts only function(s) as argument(s). It's solved totally elegantly (I believe, it's called closure/currying), one function provides another function and the other provides an observable. The closure/currying pattern was not clear at the first glance.

... and because i am not so familiar with functional programming, i could not interpret the error message immediately.

/rxjs/node_modules/rxjs/internal/util/pipe.js:22
        return fns.reduce(function (prev, fn) { return fn(prev); }, input);
                                                       ^

TypeError: fn is not a function
    at /rxjs/node_modules/rxjs/internal/util/pipe.js:22:56

@claudiordgz Thank you so much, now everything is so clear for me!

I think everything is cleared so that this issue can be closed :)

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

OliverJAsh picture OliverJAsh  路  3Comments

chalin picture chalin  路  4Comments

benlesh picture benlesh  路  3Comments

giovannicandido picture giovannicandido  路  4Comments

unao picture unao  路  4Comments