Hello!
I am currently using Rx 4 to implement a multipart file uploading feature for my organization. I was hoping to ensure that our work would be fully compatible with Rx 5 when it is ready for use in production. I noticed in the Migration doc that one operator we are using, the pausable operator, won't be implemented in this version.
We're using the pausable operator to pause an Observable stream temporarily if the user loses internet connection and then allow them to resume the upload stream when they're ready.
I'm interested in the reasoning for the removal of this operator. Should we switch to use something else to get the same functionality?
It's not been "removed" so much as it hasn't been added. This is mostly because all you need to replicate this functionality is a switchMap and Observable.never:
const source = getSomeSourceObservable();
const pauser = new Subject();
// All the magic is here
const pausable = pauser.switchMap(paused => paused ? Observable.never() : source);
pausable.subscription(x => console.log(x));
pauser.next(true);
Thanks for the help @blesh. That's exactly what i was looking for!
@blesh What about pausableBuffered? I'm still noob enough to not totally see a replacement.
Hey,
@blesh I stumbled upon a case in which pausable would be helpful which the upper example doesn't cover. In the upper example the subscriber would just stop to receive events from the source. Consider the source being an IntervalObserver that for example polls a web service on every tick and then publishes the results. With the upper code source would not publish results, but the observer would still poll the web service. If I understand the original pausable correctly, pausing would prevent the IntervalObserver itself from generating ticks, thus preventing the polling and not only the publish.
In either case (even if the original implementation behaves like your example) I think it would be a valuable feature to actually be able to pause an observer.
@tom-mayer yes, we should add this to our roadmap (PR's welcome of course!). @blesh should this be tagged blocking 5-oh release?
@trxcllnt adding new operators does not incur a major revision change, so no, I wouldn't block on this.
@blesh @young-steveo
I'm also pretty noob but I've taken a stab implementing the
pausableBuffered observable, and it seems to work.
Check out the plunkr: https://plnkr.co/zwLlDmwQ5nL2jN3L31jk/
We need a pauser and a source. In the plunker these are an observable listening
to clicks on the document and an interval. We also need a buffer that we'll
subscribe to the buffered output the source.
var pauser = new Rx.Subject();
var source = new Rx.Subject();
var buffer = new Rx.Subject();
getObservableSomehow.subscribe(source);
N.B.: We don't want to use the original source observable directly because
when we do, the emissions from ourpausableBufferedget reset to the beginning
of the sequence each time we toggle the pauser.
We want to buffer the emissions from source, and close and emit this buffer
whenever pauser emits (i.e. is toggled). We don't want to emit arrays, but
rather the indivual items, so we make sure to mergeAll() before we subcribe
buffer. (UPDATE: I had previously used .concatMap(Rx.Observable.from) to do this until I realized that mergeAll() was the new flatten() 馃槄 )
source
.buffer(pauser)
.mergeAll()
.subscribe(buffer);
Finally, we use a switchMap() on pauser to emit items either from source or
buffer.
var pausableBuffered = pauser.switchMap(function(paused) {
return paused ? buffer : source;
});
pausableBuffered.subscribe(function(x) {
console.log('Next: ' + x.toString());
});
bufferToggle()Instead of using the buffer() method presented above, you can also use
bufferToggle() like this:
.bufferToggle(pauser, function (toggle) {
return toggle ? Rx.Observable.empty() : pauser;
})
Using bufferToggle() may be more performant. I'm not sure because I haven't
inspected the implementation, but if it works like I think it does, then using
it would skip buffering emissions while unpaused. It is more code, however. If
the pausableBuffered is going to be running unpaused most of the time, then
the perfomance gain will be negligible and the buffer() syntax is probably
preferred.
@tom-mayer
Couldn't you just use @blesh's implementation above to send the intervals to Observable.empty() before you do the polling? Of course it still could be a good idea to make Observables pausable.
Hi, I am using the "rxjs": "5.0.0-beta.6", and pausable.subscription(x => console.log(x)); not working for me, but pausable.subscribe works fine,
More a question, when I am pause and continue, the value is restarted, there are some form to maintain the value?
My code is like this
ngOnInit() {
this.pauser = new Subject();
this.source = this.getSomeSourceObservable();
const pausable = this.pauser.switchMap(paused => paused ? Observable.never() : this.source);
pausable.subscribe(t => this.tickerFunc(t));
this.pauser.next(false);
}
tickerFunc(tick){
this.timer = moment().startOf('day').seconds(tick).format('mm:ss');
}
pause() {
this.pauser.next(true);
}
resume() {
this.pauser.next(false);
}
If I understand what's going on correctly, this is by design since the source is an observable, instead of a subject. Each time your pauser is resumed, it grabs a new subscription on the observable. Because the observable is unicast, each subscriber gets its own separate replay of the items from the observable. In order to maintain the state between resume() calls you should subscribe a subject to the source and listen to that. Something like this:
ngOnInit() {
this.pauser = new Subject();
this.source = new Subject();
this.getSomeSourceObservable().subscribe(this.source);
const pausable = this.pauser.switchMap(paused => paused ? Observable.never() : this.source);
pausable.subscribe(t => this.tickerFunc(t));
this.pauser.next(false);
}
tickerFunc(tick){
this.timer = moment().startOf('day').seconds(tick).format('mm:ss');
}
pause() {
this.pauser.next(true);
}
resume() {
this.pauser.next(false);
}
Hm I think that I am understanding, but I changed my code like you said:
`````` javascript
ngOnInit() {
this.pauser = new Subject();
this.source = new Subject();
this.getSomeSourceObservable().subscribe(this.source);
const pausable = this.pauser.switchMap(paused =>
paused ? Observable.never()
: this.source);
pausable.subscribe(t => this.tickerFunc(t));
this.pauser.next(false);
}
tickerFunc(tick){
this.timer = moment().startOf('day').seconds(tick).format('mm:ss');
}
pause() {
this.pauser.next(true);
}
resume() {
this.pauser.next(false);
}
getSomeSourceObservable() {
return Observable
.timer(1000,1000)
.map(i => this.start - i)
.take(this.start + 1)
}
```javsscript
Now when I pause and resume the counter does not restart, but the values is buffered, and when I start again the counter does not pause. But not luck
I changed my Observable to
```javascript
getSomeSourceObservable() {
return Observable
.timer(1000,1000)
.map((i) => { console.log('pausade:', this.paused, "i", i);return this.paused ? this.start : this.start - i })
.take(this.start + 1)
}
``````
but nothing :( I think that it not work
I am trying to make a reactive countdown.
@kidchenko Did you ever manage to get this working?
Is there any plan on implementing this? pausable already had slight overhead due to having to push to a subject, but it's certainly better than implementing from scratch each time.
I had need of a pausableBuffered operator and have assembled the mixin below. This conceptually matches the RX PausableBuffer pattern, but is not yet a drop-in replacement for the RxJS 4 pausableBuffer because it has not implemented pause() and resume() on the returned observable.
Demo running at https://jsfiddle.net/ofn7qzmv/3/
Rx.Observable.prototype.pausableBuffered = function pausableBuffered(pauser) {
const source = this
const output = new Rx.Subject();
const sourceComplete = Rx.Observable.create(function(observer) {
observer.next(false)
source.subscribe({
complete: _ => {
observer.next(true)
observer.complete()
}
})
})
const initializedPauser = Rx.Observable.merge(
Rx.Observable.of(false),
pauser,
sourceComplete
)
const endPauseSignal = initializedPauser
.map(x => !!x)
.distinctUntilChanged()
const passthrough = initializedPauser
.switchMap(paused => paused ? Rx.Observable.never() : source)
const bufferableEvents = initializedPauser
.switchMap(paused => paused ? source : Rx.Observable.never())
const buffered = new Rx.Subject()
bufferableEvents
.buffer(endPauseSignal)
.subscribe(arr => arr.forEach(val => buffered.next(val)))
const bufferedOutput = Rx.Observable.merge(
passthrough,
buffered
)
// Becuase Rx.Observable.never doesn't complete, the resulting merge
// will not either
sourceComplete
.switchMap(complete => complete ? Rx.Observable.empty() : bufferedOutput)
.subscribe(output)
return output
}
Hey, I got a slight problem with this approach.
I implemented spaseships game in angular based on code from the book Reactive Programming with RxJS by Sergi Mansilla
No I want to improve the game by adding pause menu. So i added it as follows:
const game$ = Observable.combineLatest(
this.starsService.starStream$(),
this.heroService.Spaceship$(this.canvasRef.nativeElement),
this.enemiesService.Enemies$(),
this.heroService.HeroShots$(this.canvasRef.nativeElement),
Score, (stars, spaceship, enemies, heroShots, score) => ({
stars, spaceship, enemies, heroShots, score
}))
.sampleTime(GameOptions.SPEED)
.takeWhile(actors => this.helpersService.gameOver(actors.spaceship, actors.enemies) === false);
this.pauser.switchMap(paused => paused ? Observable.empty() : game$)
.subscribe(render, error, complete);
this.pauser.next(false);
}
starStream$() {
return Observable.range(1, GameOptions.STAR_NUMBER)
.map(() => ({
x: parseInt((Math.random() * GameOptions.CANVAS_WIDTH).toString(), 10),
y: parseInt((Math.random() * GameOptions.CANVAS_HEIGHT).toString(), 10),
size: Math.random() * 3 + 1
}))
.toArray()
.flatMap(stars => Observable.interval(GameOptions.SPEED)
.map(() => {
console.log(GameOptions.STAR_NUMBER);
stars.forEach(star => {
if (star.y >= GameOptions.CANVAS_HEIGHT) {
star.y = 0;
}
star.y += star.size;
});
return stars;
})
);
}
It works fine for spaceships and shots, but doubles stars number each pause/start cycle.
I then went on and used publishReplay(1) and connect on game$, which solves stars problem and doubles spaceships and shots now.
Can anyone point me in the right direction?
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.
Most helpful comment
It's not been "removed" so much as it hasn't been added. This is mostly because all you need to replicate this functionality is a
switchMapandObservable.never: