I am trying to figure out how to create a SignalProducer
that will repeatedly fetch the next chunk of data when the whole stream has been consumed.
I have an implementation, however the backpressure mechanism doesn't work if observeOn
is called on the producer. What seems to happen is that the producer continues to fetch the new data even though the client still hasn't consumed the entire stream.
Any idea how I could achieve what I want? Is that actually possible?
Thanks!
public func streamMessages(from startOffset: Offset = Offset(value: 0), toExclusive endOffsetOpt : Offset? = .None,
includeTransient: Bool = true) -> SignalProducer<Message, NoError> {
func streamMessagesChunk(from: Offset) -> SignalProducer<Message, NoError> {
func waitForNewMessageAvailable(from: Offset) -> SignalProducer<Offset?, NoError> {
return self.lastOffsetIncludingTransient(includeTransient).producer
.filter{ offsetOpt in offsetOpt.map {offset in offset >= from } ?? false }
.take(1)
}
let streamMessagesProducer = self.fetchMessages(10, from: from, includeTransientMessages: includeTransient)
.flatMap(.Concat){ messages in SignalProducer<Message, NoError>(values: messages)}
return waitForNewMessageAvailable(from)
.then(streamMessagesProducer)
}
func streamNextBatch(from: Offset, observer: Observer<Message, NoError>, observerDisposable: CompositeDisposable) -> Void {
func hasReachedEndOffset(currentOffset: Offset) -> Bool {
return endOffsetOpt.map{ endOffset in endOffset == currentOffset } ?? false
}
print("StreamNextBatch \(from)")
streamMessagesChunk(from).startWithSignal { signal, signalDisposable in
var lastOffset: Offset = from
let disposableHandle = observerDisposable.addDisposable(signalDisposable)
signal.observe { switch $0 {
case let .Failed(error): observer.sendFailed(error)
case .Interrupted: observer.sendInterrupted()
case .Completed:
disposableHandle.remove()
streamNextBatch(lastOffset.next, observer: observer, observerDisposable: observerDisposable)
case .Next(let message):
if hasReachedEndOffset(message.offset) {
disposableHandle.remove()
observer.sendCompleted()
} else {
lastOffset = message.offset
observer.sendNext(message)
}
}
}
}
}
return SignalProducer<Message, NoError> { observer, observerDisposable in
streamNextBatch(startOffset, observer: observer, observerDisposable: observerDisposable)
}
}
func testShouldStreamMessagesWaitingForFutureMessages() {
let expectation = self.expectationWithDescription("Test")
let messages = (0...50000).map{value in self.createKafkaData(UInt64(value)) }
let nextMessages = (50001...65000).map{value in self.createKafkaData(UInt64(value)) }
try! self.sut.publishMessages(messages, persist: false).get()
let messageCountFuture = self.sut
.streamMessages(from: Offset(value: 45), toExclusive: Offset(value: 60000), includeTransient: true)
.observeOn(QueueScheduler())
.map{ m in print("sleeping at \(m.data)"); sleep(1); return 1 }
.reduce(0, +)
.toFuture()
messageCountFuture.onSuccess{ count in
expect(count) == 15
expectation.fulfill()
}
try! self.sut.publishMessages(nextMessages, persist: false).get()
self.waitForExpectationsWithTimeout(30, handler: nil)
}
func createKafkaData(number: UInt64) -> String {
return "message \(number)"
}
_Sorry for the long delay in a response. We've been working through a lot of old issues as some contributors have dropped off._
Sadly, AFAIK there's not currently a way to provide backpressure across schedulers. 馃槥
Do you know if any other reactive frameworks provide a way to do this? I definitely think it's an interesting concept.
As far as I know, the frameworks from the Rx family do not generally support backpressure, but one particular implementation of streams (akka-streams) does support it: http://www.smartjava.org/content/visualizing-back-pressure-and-reactive-streams-akka-streams-statsd-grafana-and-influxdb
The idea is that data flows downstream and demand flows upstream, so the recipient is always in control of the maximal incoming data rate.
Also RxSwift does not seem to have backpressure, but it looks like their defer operator would be enough to address the same needs: http://reactivex.io/documentation/operators/defer.html
How hard would it be to have such a defer operator in ReactiveCocoa?
Not hard. I did something similar in Rex. Although, it sounds like your asking for a generalized version. Instead of a time interval for delay you would want a trigger signal/producer param to defer the subscription.
@neilpa we would need the subscription to be deferred until the consumer has finished consuming the stream. This works well with ReactiveCocoa
(example above) until you start subscribing on a different scheduler.
I'm going to close this since your question has been answered. If you'd like to make a proposal for something, feel free to open a new issue or PR!