takeUntil and takeWhile does not include last value.
Please see this post, there are links in the comments to other users having similar problems.
https://stackoverflow.com/questions/51767204/rxjs-interval-with-takeuntil-to-publish-last-value
Describe the solution you'd like
Need option to get last value of takeUntil and takeWhile
Describe alternatives you've considered
Workaround is in stack overflow post but is not clean.
TLDR;
My best suggestion so far would be a new operator:
endWhen(predicate) which would be very 'googlable' and explicit in what it does
I would really value this too. I very often need something 'sticky' where I basically want the stream to end when a certain value is reached. When you know the last value you can do the following:
public showBall$ = this.showBall$.pipe(takeWhile(x => x == false), endWith(true));
So here we keep checking if we should show the ball, until it is true and then we complete with an explicit value of true so the ball always shows.
Currently the takeUntil(X) operator takes an observable. There is no overload with a predicate. Therefore it's pretty unambiguous (to me at least) that you won't get the 'last' value emitted from the source stream when you use this. This definitely shouldn't change - take until X happens (something outside our stream) is quite clear.
So what you're really asking for here is takeUntil(predicate) which emits the final value. However there's a problem with this. If we added that we'd have takeUntil(predicate) that did emit the final (non predicate matching) value, and takeUntil(obs) that didn't.
So it would need to be named differently:
takeUntil(predicate, includeLastValue) seems clumsy
takeUntilInclusive(predicate) also seems clumsy
takeWhile(x => x != FOO), endWith(FOO) is something I'd like to get away from
My best suggestion so far would be a new operator:
endWhen(predicate) which would be very 'googlable' and explicit in what it does
Given that I have people inside of Google making a custom operator to suit this need, I think it's time to discuss implementing it.
Add a second argument boolean inclusive, that will include the first value to return false for takeWhile.
source$.pipe(takeWhile(predicate, true)); // inclusive of last value
Until then, there are super-gross workarounds that use current operators like:
const takeWhileInclusive = (predicate) => (source) =>
source.pipe(
concatMap((value, i) => prediciate(value, i) ? of({ value }) : of({ value }, { kill: true })),
takeWhile(({ kill }) => !kill),
map(({ value }) => value),
);
// usage
source$.pipe(takeWhileInclusive(predicate));
Hi @benlesh - what's your thought on endWhen(predicate) as a new operator for this?
takeWhile with a boolean is 'acceptable' but really will confuse people. It doesn't even make sense in english. Imagine - tell someone to keep taking a piece of fruit until they get an apple - (oh unless I have my hand in the air in which case please also take the first apple you see but no more). UGH!
I've seen many cases where people ask 'how do I end a stream' and I've googled it myself. If I googled for that and endWhen came up I'd instantly know it was what I was looking for. The take family always ends the stream right so I don't see why endWhen couldn't be used?
Of course there are complications and it's not always that simple - hence why I'm asking you ;-)
Should have a PR soon.
@simeyla I'm not sure endWhen vs takeWhile will be any more or less confusing for people. Our API surface area is already enormous.
From core team meeting:
takeWhile(fn, inclusive?) sounds goodtakeUntil.Looks ideal to me,
Great and thanks guys!
I suggest name it completeWhen rather than endWhen.
Code for it:
export function completeWhen<T>(predicate: (value: T) => boolean) {
return (source: Observable<T>) =>
new Observable<T>(observer => {
return source.subscribe({
next(x) {
let shouldContinue: boolean;
try {
shouldContinue = predicate(x);
}
catch (err) {
observer.error(err);
return;
}
observer.next(x);
if (!shouldContinue) {
observer.complete();
}
},
error(err) { observer.error(err); },
complete() { observer.complete(); }
});
});
}
In our product we have server that sends response in several messages. Final message has is_complete flag. We use completeWhen custom rxjs-operator to accumulate received messages and return them as single piece.
Closed by #4115.
Most helpful comment
Given that I have people inside of Google making a custom operator to suit this need, I think it's time to discuss implementing it.
Proposal
Add a second argument boolean
inclusive, that will include the first value to return false fortakeWhile.Workaround
Until then, there are super-gross workarounds that use current operators like: