Rxswift: RepeatUntil feature request

Created on 25 Oct 2018  路  10Comments  路  Source: ReactiveX/RxSwift

Short description of missing functionality:

Would like an operator that can repeat an observable based on a given predicate.

Short code example of how you would like to use the API:
var packetsToSend = Data(count: 100) sendPacketsToDeviceCompletable(retreiveAndRemovePackets(size: 20)) .repeatUntil { packetsToSend.isEmpty() } .subscribe()

The reason why I need this functionality:

Chain an unknown sized package to clients in an ordered manner.

Code I have right now:

There isn't one right now... I'm still figuring things out.

Most helpful comment

Wrote a working extension :

````
extension Observable {
func repeatUntil(_ block: @escaping () throws -> Bool) -> Observable {
return Observable.deferred {
Observable.create{ observer in
var subscription: Disposable? = nil
let disposable = Disposables.create {subscription?.dispose()}
func subscribe() {
subscription = self.subscribe(
onNext: {
observer.on(.next($0))
}, onError: {
observer.on(.error($0))
}, onCompleted: {
do {
if try !block() {
subscribe()
} else {
observer.on(.completed)
}
} catch {
observer.onError(error)
}
},onDisposed: nil)
}
subscribe()
return disposable
}
}
}
}

extension PrimitiveSequence where Trait == CompletableTrait, Element == Never {
func repeatUntil(_ block: @escaping () throws -> Bool) -> Completable {
return Completable.deferred {
Completable.create{ observer in
var subscription: Disposable? = nil
let disposable = Disposables.create {subscription?.dispose()}
func subscribe() {
subscription = self.subscribe(
onCompleted: {
do {
if try !block() {
subscribe()
} else {
observer(.completed)
}
} catch {
observer(.error(error))
}
},onError: {
observer(.error($0))
})
}
subscribe()
return disposable
}
}
}
}

extension PrimitiveSequence where Trait == SingleTrait {
func repeatUntil(_ block: @escaping () -> Bool) -> Single {
return Single.deferred {
self.asObservable()
.repeatUntil(block)
.take(1)
.asSingle()
}
}
}
````

All 10 comments

You could possibly do

Observable<Int>.interval(...)
   .flatMap { doSomeOtehrActionConstantly() }
   .takeUntil(otherStream)

@freak4pc , thanks for the quick reply.

Thats one way. Can I check if:
Observable.repeatElement(retrieveAndRemovePackets(size: 20) .takeWhile{!$0.isEmpty()} .flatMapCompletable{ sendToDeviceCompletable($0) } .ignoreElements()

Will this execute the completable:
1) In order? (I understand that there is some issues with interleaving, therefore RxJava has the SwitchMap operator)
2) sequentially? (Meaning only after each completable has completed)
3) wait for all the completables to complete before completing itself?

The best way to learn is to try :)
The PRs aren't a good place for "How do I do X" :]

Your operator suggestion is in place since repeatUntil exists in RxJava, but TBH I've never seen a request for it. I guess it depends on community demand, etc. (and of course on Krunoslav's decision here)

@freak4pc,

Thanks for the heads up.
Yep, I'm testing it out soon. I believe I came across one request for it a year or so ago, I'll link it if I come across it again.

I also have a big need for this request. As far as I can find, there is no way to continuously resubscribe to an observable chain in RxSwift. In RxJS there is an operator called Repeat that will repeat the observable chain for a specified number of times. However, it seems that the repeatElement operator in RxSwift repeats the observable chain, but it does not wait for the previous iteration to complete before iterating over it. This is a problem because it results in a memory leak

This might be of help https://gist.github.com/dtartaglia/ec1032375498eb95aa260239b289d263

something like: emitWhile(retreiveAndRemovePackets(size: 20), pred: { _ in retreiveAndRemovePackets(size: 20) }, producer: sendPacketsToDeviceCompletable)

The above assumes that sendPacketsToDeviceCompletable returns a Single. If it doesn't then you will need to adjust.

+1

repeatUntil / repeatWhile would be super useful

How I'm currently doing it and its working:

````
var completables = Completable
for i in 0...end {
completables.append(
Completable.complete()
)
}

return Completable.concat(completables)

````

Wrote a working extension :

````
extension Observable {
func repeatUntil(_ block: @escaping () throws -> Bool) -> Observable {
return Observable.deferred {
Observable.create{ observer in
var subscription: Disposable? = nil
let disposable = Disposables.create {subscription?.dispose()}
func subscribe() {
subscription = self.subscribe(
onNext: {
observer.on(.next($0))
}, onError: {
observer.on(.error($0))
}, onCompleted: {
do {
if try !block() {
subscribe()
} else {
observer.on(.completed)
}
} catch {
observer.onError(error)
}
},onDisposed: nil)
}
subscribe()
return disposable
}
}
}
}

extension PrimitiveSequence where Trait == CompletableTrait, Element == Never {
func repeatUntil(_ block: @escaping () throws -> Bool) -> Completable {
return Completable.deferred {
Completable.create{ observer in
var subscription: Disposable? = nil
let disposable = Disposables.create {subscription?.dispose()}
func subscribe() {
subscription = self.subscribe(
onCompleted: {
do {
if try !block() {
subscribe()
} else {
observer(.completed)
}
} catch {
observer(.error(error))
}
},onError: {
observer(.error($0))
})
}
subscribe()
return disposable
}
}
}
}

extension PrimitiveSequence where Trait == SingleTrait {
func repeatUntil(_ block: @escaping () -> Bool) -> Single {
return Single.deferred {
self.asObservable()
.repeatUntil(block)
.take(1)
.asSingle()
}
}
}
````

This has been here for a long time, but I don't think we should include this extension. I think these kinds of operations should be easily possible with RxFeedback or some higher level APIs.

This doesn't look like something low level to me. We've merged retryWhen because it was hard to do customizable retries, but I'm unsure was even that a good decision and should we pull it out.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

jeremiegirault picture jeremiegirault  路  3Comments

trungp picture trungp  路  3Comments

retsohuang picture retsohuang  路  3Comments

angerman picture angerman  路  3Comments

dmial picture dmial  路  3Comments