Reactivecocoa: how to provide backpressure (lazy signal producer that fetches more data as the data is consumed)

Created on 20 Apr 2016  路  6Comments  路  Source: ReactiveCocoa/ReactiveCocoa

I am trying to figure out how to create a SignalProducer that will repeatedly fetch the next chunk of data when the whole stream has been consumed.
I have an implementation, however the backpressure mechanism doesn't work if observeOn is called on the producer. What seems to happen is that the producer continues to fetch the new data even though the client still hasn't consumed the entire stream.

Any idea how I could achieve what I want? Is that actually possible?

Thanks!

public func streamMessages(from startOffset: Offset = Offset(value: 0), toExclusive endOffsetOpt : Offset? = .None,
                               includeTransient: Bool = true) -> SignalProducer<Message, NoError> {

        func streamMessagesChunk(from: Offset) -> SignalProducer<Message, NoError> {
            func waitForNewMessageAvailable(from: Offset) -> SignalProducer<Offset?, NoError> {
                return self.lastOffsetIncludingTransient(includeTransient).producer
                    .filter{ offsetOpt in offsetOpt.map {offset in offset >= from } ?? false }
                    .take(1)
            }

            let streamMessagesProducer = self.fetchMessages(10, from: from, includeTransientMessages: includeTransient)
                .flatMap(.Concat){ messages in SignalProducer<Message, NoError>(values: messages)}

            return waitForNewMessageAvailable(from)
                .then(streamMessagesProducer)
        }

        func streamNextBatch(from: Offset, observer: Observer<Message, NoError>, observerDisposable: CompositeDisposable) -> Void {
            func hasReachedEndOffset(currentOffset: Offset) -> Bool {
                return endOffsetOpt.map{ endOffset in endOffset == currentOffset } ?? false
            }

            print("StreamNextBatch \(from)")

            streamMessagesChunk(from).startWithSignal { signal, signalDisposable in
                var lastOffset: Offset = from
                let disposableHandle = observerDisposable.addDisposable(signalDisposable)

                signal.observe { switch $0 {
                    case let .Failed(error): observer.sendFailed(error)
                    case .Interrupted: observer.sendInterrupted()
                    case .Completed:
                        disposableHandle.remove()
                        streamNextBatch(lastOffset.next, observer: observer, observerDisposable: observerDisposable)
                    case .Next(let message):
                        if hasReachedEndOffset(message.offset) {
                            disposableHandle.remove()
                            observer.sendCompleted()
                        } else {
                            lastOffset = message.offset
                            observer.sendNext(message)
                        }
                    }
                }
            }
        }

        return SignalProducer<Message, NoError> { observer, observerDisposable in
            streamNextBatch(startOffset, observer: observer, observerDisposable: observerDisposable)
        }
    }

func testShouldStreamMessagesWaitingForFutureMessages() {
        let expectation = self.expectationWithDescription("Test")
        let messages = (0...50000).map{value in self.createKafkaData(UInt64(value)) }
        let nextMessages = (50001...65000).map{value in self.createKafkaData(UInt64(value)) }

        try! self.sut.publishMessages(messages, persist: false).get()

        let messageCountFuture = self.sut
            .streamMessages(from: Offset(value: 45), toExclusive: Offset(value: 60000), includeTransient: true)
            .observeOn(QueueScheduler())
            .map{ m in print("sleeping at \(m.data)"); sleep(1); return 1 }
            .reduce(0, +)
            .toFuture()

        messageCountFuture.onSuccess{ count in
            expect(count) == 15
            expectation.fulfill()
        }

        try! self.sut.publishMessages(nextMessages, persist: false).get()

        self.waitForExpectationsWithTimeout(30, handler: nil)
    }

     func createKafkaData(number: UInt64) -> String {
        return "message \(number)"
    }
question

All 6 comments

_Sorry for the long delay in a response. We've been working through a lot of old issues as some contributors have dropped off._

Sadly, AFAIK there's not currently a way to provide backpressure across schedulers. 馃槥

Do you know if any other reactive frameworks provide a way to do this? I definitely think it's an interesting concept.

As far as I know, the frameworks from the Rx family do not generally support backpressure, but one particular implementation of streams (akka-streams) does support it: http://www.smartjava.org/content/visualizing-back-pressure-and-reactive-streams-akka-streams-statsd-grafana-and-influxdb

The idea is that data flows downstream and demand flows upstream, so the recipient is always in control of the maximal incoming data rate.

Also RxSwift does not seem to have backpressure, but it looks like their defer operator would be enough to address the same needs: http://reactivex.io/documentation/operators/defer.html

How hard would it be to have such a defer operator in ReactiveCocoa?

Not hard. I did something similar in Rex. Although, it sounds like your asking for a generalized version. Instead of a time interval for delay you would want a trigger signal/producer param to defer the subscription.

@neilpa we would need the subscription to be deferred until the consumer has finished consuming the stream. This works well with ReactiveCocoa (example above) until you start subscribing on a different scheduler.

I'm going to close this since your question has been answered. If you'd like to make a proposal for something, feel free to open a new issue or PR!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

v-silin picture v-silin  路  4Comments

eimantas picture eimantas  路  6Comments

gabro picture gabro  路  5Comments

toddbluhm picture toddbluhm  路  5Comments

dougmcbride picture dougmcbride  路  3Comments