Rxjava: subscribe to hot connectableObservable downstream.

Created on 20 Jul 2017  路  6Comments  路  Source: ReactiveX/RxJava

Im looking for info on a reactive implementation I've been working which records audio to a flowable buffer, makes a bunch of networking calls and accesses the buffer downstream to replay and stream audio to a server. My problem is similar to the following RxJava/issues/2931. (I think)

My use case is as follows:

  • Record audio into Flowable
  • Make two network requests
  • On third network request, read data from Flowable into a request body and continue streaming until user clicks stop.

My implementation is as follows:

public void executeRequest() {
     startRecording()
               .publish(stream -> Flowable.defer(() -> getEventFlowable(stream)))
               .subscribe(response -> Log.d("resylt", "accept: " + response));
 }

private Publisher<AudioStreamResponse> getEventFlowable(Flowable<ByteString> stream) {
    return api.createConversation()
            .flatMap(this::createAudioQuery)
            .flatMap(x -> streamAudio(x, stream));
 }

private Single<AudioStreamResponse> streamAudio(AudioQueryResponse 
response,Flowable<ByteString> stream) {
    return api.streamAudioQuery(response.getHref(), new StreamRequestBody(stream));
 }

public Flowable<ByteString> startRecording() { 
    return Flowable.create(this::readFromMicrophoneloop, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.computation())
            .doOnTerminate(record::stop)
            .replay()
            .refCount();
 }

My method does seem to work and using the publish method I am able to access the Flowable downstream but does what i'm doing break the observable contract because I'm not passing the Flowable through each flatmap?

This approach was not mentioned in the the related issue above so I'm wondering if my approach is not the right way of dealing with this problem.

2.x Question

All 6 comments

Beyond a couple of unnecessary calls, your approach looks okay. I don't think you need .replay().refCount() since you have only one end consumer per flow. Flowable.defer(() -> is also unnecessary as the function the publish calls is executed for each individual subscriber to it.

Thank you for the quick response! It looks like replay/refcount and defer were indeed unnecessary :). I wonder if as a followup you might be able to shed some light on testing difficulty I'm having with this implementation. My StreamRequestBody(stream) uses a blockingForEach to stream bytes to through a Retrofit RequestBody from the Flowable.create.

public class StreamRequestBody extends RequestBody {
    @Override
    public void writeTo(BufferedSink sink) throws IOException {
          stream.blockingForEach(byteString -> sink.write(byteString.toByteArray()));
    }
}

Im also using TestRule to set all schedulers to Schedulers.trampoline(). (This cold be causing the problem.)

In practice the publish(function()) kicks off the recorder loop, emits byte[]'s through emitter.onNext() downstream so when the StreamRequestBody's blockingForEach(...) method is eventually called the publish 'stream' replays all events, continues streaming and eventually closes when emitter.onComplete() is called.

In my test Im mocking the network responses but the method calls appear out of order. The test pauses on the blockingForEach method. From debugging it looks like the network calls happen before the FlowableOnSubscribe.subscribe() from the startRecording() method and therefore the publish 'stream' never gets an onComplete call. Ive tried using defer and concatMap to call FlowableOnSubscribe.subscribe() first but they do not seem to work.

Do you know if there's something obvious I'm not doing here? Im fairly new to rxJava so im sure there are a few things im not picking up on.. Thanks!

I can't tell without the actual unit test, which should be executable on its own on a desktop Java setup.

I can illustrate the flow by way of a short contrived example. given the following classes:

public class StreamBody {
    private final Flowable<ByteString> recorder;
    public StreamBody(Flowable recorder) {
        this.recorder = recorder;
    }

    public ByteString writeTo() {
        recorder.blockingForEach(x -> Log.d("StreamBody", "writeTo: "));
        return ByteString.EMPTY;
    }
} 

public class Recorder {
    private void read(Emitter<ByteString> emitter) {
        emitter.onNext(ByteString.EMPTY);
        emitter.onComplete();
    }

    public Flowable<ByteString> startRecording() {
        return Flowable.create(this::read, BackpressureStrategy.BUFFER);
    }
}

public class Client {
    Recorder recorder;
    public Client() {
        recorder = new Recorder();
    }

    public void execute() {
        recorder.startRecording()
            .publish(this::getFlowable)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(byteString -> Log.d("Client", "accept: "));
    }

    private Flowable<ByteString> getFlowable(Flowable<ByteString> byteStringFlowable) {
         return Flowable.fromCallable(() -> new StreamBody(byteStringFlowable).writeTo());
    }
}

When I run the execute method on the client I would expect that Recorder.startRecording() flowable would be subscribed to first and would issue onNext() and OnComplete() before getFlowable method is executed within the publish anonymous inner class but it happens the other way around, causing the flow to pause on the recorder.blockingForEach method.

I'm sure this is expected behaviour and I practice I can mitigate the issue by subscribing to getFlowable on a different thread but do you know if there's any way around this, to reverse the order such that the getFlowable isn't executed until the recorder flowable has been subscribed to and begins emitting items?

The problem is the blockingForEach. If you have such thing in a composed flow, you have broken the flow of non-blocking events and are subject to hangs or unexpected execution orderings.

Looks like this question has been answered. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

Was this page helpful?
0 / 5 - 0 ratings