Short description of missing functionality:
Would like an operator that can repeat an observable based on a given predicate.
Short code example of how you would like to use the API:
var packetsToSend = Data(count: 100)
sendPacketsToDeviceCompletable(retreiveAndRemovePackets(size: 20))
.repeatUntil { packetsToSend.isEmpty() }
.subscribe()
The reason why I need this functionality:
Chain an unknown sized package to clients in an ordered manner.
Code I have right now:
There isn't one right now... I'm still figuring things out.
You could possibly do
Observable<Int>.interval(...)
.flatMap { doSomeOtehrActionConstantly() }
.takeUntil(otherStream)
@freak4pc , thanks for the quick reply.
Thats one way. Can I check if:
Observable.repeatElement(retrieveAndRemovePackets(size: 20)
.takeWhile{!$0.isEmpty()}
.flatMapCompletable{ sendToDeviceCompletable($0) }
.ignoreElements()
Will this execute the completable:
1) In order? (I understand that there is some issues with interleaving, therefore RxJava has the SwitchMap operator)
2) sequentially? (Meaning only after each completable has completed)
3) wait for all the completables to complete before completing itself?
The best way to learn is to try :)
The PRs aren't a good place for "How do I do X" :]
Your operator suggestion is in place since repeatUntil exists in RxJava, but TBH I've never seen a request for it. I guess it depends on community demand, etc. (and of course on Krunoslav's decision here)
@freak4pc,
Thanks for the heads up.
Yep, I'm testing it out soon. I believe I came across one request for it a year or so ago, I'll link it if I come across it again.
I also have a big need for this request. As far as I can find, there is no way to continuously resubscribe to an observable chain in RxSwift. In RxJS there is an operator called Repeat that will repeat the observable chain for a specified number of times. However, it seems that the repeatElement operator in RxSwift repeats the observable chain, but it does not wait for the previous iteration to complete before iterating over it. This is a problem because it results in a memory leak
This might be of help https://gist.github.com/dtartaglia/ec1032375498eb95aa260239b289d263
something like: emitWhile(retreiveAndRemovePackets(size: 20), pred: { _ in retreiveAndRemovePackets(size: 20) }, producer: sendPacketsToDeviceCompletable)
The above assumes that sendPacketsToDeviceCompletable returns a Single. If it doesn't then you will need to adjust.
+1
repeatUntil / repeatWhile would be super useful
How I'm currently doing it and its working:
````
var completables = Completable
for i in 0...end {
completables.append(
Completable.complete()
)
}
return Completable.concat(completables)
````
Wrote a working extension :
````
extension Observable {
func repeatUntil(_ block: @escaping () throws -> Bool) -> Observable {
return Observable.deferred {
Observable.create{ observer in
var subscription: Disposable? = nil
let disposable = Disposables.create {subscription?.dispose()}
func subscribe() {
subscription = self.subscribe(
onNext: {
observer.on(.next($0))
}, onError: {
observer.on(.error($0))
}, onCompleted: {
do {
if try !block() {
subscribe()
} else {
observer.on(.completed)
}
} catch {
observer.onError(error)
}
},onDisposed: nil)
}
subscribe()
return disposable
}
}
}
}
extension PrimitiveSequence where Trait == CompletableTrait, Element == Never {
func repeatUntil(_ block: @escaping () throws -> Bool) -> Completable {
return Completable.deferred {
Completable.create{ observer in
var subscription: Disposable? = nil
let disposable = Disposables.create {subscription?.dispose()}
func subscribe() {
subscription = self.subscribe(
onCompleted: {
do {
if try !block() {
subscribe()
} else {
observer(.completed)
}
} catch {
observer(.error(error))
}
},onError: {
observer(.error($0))
})
}
subscribe()
return disposable
}
}
}
}
extension PrimitiveSequence where Trait == SingleTrait {
func repeatUntil(_ block: @escaping () -> Bool) -> Single
return Single.deferred {
self.asObservable()
.repeatUntil(block)
.take(1)
.asSingle()
}
}
}
````
This has been here for a long time, but I don't think we should include this extension. I think these kinds of operations should be easily possible with RxFeedback or some higher level APIs.
This doesn't look like something low level to me. We've merged retryWhen because it was hard to do customizable retries, but I'm unsure was even that a good decision and should we pull it out.
Most helpful comment
Wrote a working extension :
````
extension Observable {
func repeatUntil(_ block: @escaping () throws -> Bool) -> Observable {
return Observable.deferred {
Observable.create{ observer in
var subscription: Disposable? = nil
let disposable = Disposables.create {subscription?.dispose()}
func subscribe() {
subscription = self.subscribe(
onNext: {
observer.on(.next($0))
}, onError: {
observer.on(.error($0))
}, onCompleted: {
do {
if try !block() {
subscribe()
} else {
observer.on(.completed)
}
} catch {
observer.onError(error)
}
},onDisposed: nil)
}
subscribe()
return disposable
}
}
}
}
extension PrimitiveSequence where Trait == CompletableTrait, Element == Never {
func repeatUntil(_ block: @escaping () throws -> Bool) -> Completable {
return Completable.deferred {
Completable.create{ observer in
var subscription: Disposable? = nil
let disposable = Disposables.create {subscription?.dispose()}
func subscribe() {
subscription = self.subscribe(
onCompleted: {
do {
if try !block() {
subscribe()
} else {
observer(.completed)
}
} catch {
observer(.error(error))
}
},onError: {
observer(.error($0))
})
}
subscribe()
return disposable
}
}
}
}
extension PrimitiveSequence where Trait == SingleTrait { {
func repeatUntil(_ block: @escaping () -> Bool) -> Single
return Single.deferred {
self.asObservable()
.repeatUntil(block)
.take(1)
.asSingle()
}
}
}
````