Akka-http: Substream Source cannot be materialized more than once when reading from s3

Created on 4 Feb 2018  Â·  18Comments  Â·  Source: akka/akka-http

There's a simple repro project for this at https://github.com/easel/akka-http-examples/blob/master/src/test/scala/io/github/easel/RecursiveAkkaHttpSpec.scala

Net-net, reading a series of files such as the SEC EDGAR files in the test fails after the first file when trying to parse individual lines. The same code works fine for a single file and for a sequence of other random urls.

Looking at the headers, the only consistent difference I'm seeing off hand is that the files that fail haven't got a content-length header.

Happy to help however but some pointers where to start digging would be really helpful.

0 - new client-new-pool

Most helpful comment

What happens is that you start multiple requests at the same time but only process the response bodies after a while. In the meantime, the response entity subscription timeout kicks in as can be seen in the warning.

To make it work either try to prevent that you run more than one concurrent request (it's surprisingly hard to do that, I tried this one:

val streamingRequestFlow: Flow[HttpRequest, ByteString, NotUsed] =
    Flow[HttpRequest]
      .mapAsync(1)(request ⇒ Http().singleRequest(request))
      .flatMapConcat { response ⇒
        response.entity.dataBytes
      }

but even this will run at least one other request concurrently because of internal buffering.

Or, you can just increase the response subscription timeout with

akka.http.host-connection-pool.response-entity-subscription-timeout = 100.seconds

The error message you see is directly caused by the " Response entity was not subscribed after 1 second" which will consume the response data when it wasn't subscribed in time. When then after a while your code "comes too late" to subscribe it gets the ugly error message. Would be nice if we could improve the error message but it might be hard to pull off.

All 18 comments

Also, this issue is specific to the new pool implementation. It seems to work just fine with the pool set to legacy. And, when it fails, the logs include the following.

[WARN] [02/04/2018 00:00:44.845] [test-akka.actor.default-dispatcher-14] [test/Pool(shared->https://www.sec.gov:443)] [3 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it. GET /Archives/edgar/full-index/2017/QTR4/master.idx Empty -> 200 OK Chunked
[WARN] [02/04/2018 00:00:44.864] [test-akka.actor.default-dispatcher-12] [test/Pool(shared->https://www.sec.gov:443)] [2 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it. GET /Archives/edgar/full-index/2017/QTR3/master.idx Empty -> 200 OK Chunked
[WARN] [02/04/2018 00:00:45.003] [test-akka.actor.default-dispatcher-21] [test/Pool(shared->https://www.sec.gov:443)] [1 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it. GET /Archives/edgar/full-index/2016/QTR4/master.idx Empty -> 200 OK Chunked

Have you tried decreasing chunk size. It worked for me.

@rabzu
Could you please clarify which AKKA settings you used to decrease the chunk size?
Most grateful,
Christopher

Thanks for the report and a working reproducer, @easel. There's certainly something going on there.

What happens is that you start multiple requests at the same time but only process the response bodies after a while. In the meantime, the response entity subscription timeout kicks in as can be seen in the warning.

To make it work either try to prevent that you run more than one concurrent request (it's surprisingly hard to do that, I tried this one:

val streamingRequestFlow: Flow[HttpRequest, ByteString, NotUsed] =
    Flow[HttpRequest]
      .mapAsync(1)(request ⇒ Http().singleRequest(request))
      .flatMapConcat { response ⇒
        response.entity.dataBytes
      }

but even this will run at least one other request concurrently because of internal buffering.

Or, you can just increase the response subscription timeout with

akka.http.host-connection-pool.response-entity-subscription-timeout = 100.seconds

The error message you see is directly caused by the " Response entity was not subscribed after 1 second" which will consume the response data when it wasn't subscribed in time. When then after a while your code "comes too late" to subscribe it gets the ugly error message. Would be nice if we could improve the error message but it might be hard to pull off.

Here's a solution that runs all requests strictly sequentially so you wouldn't have to change the subscription timeout:

  val streamingRequestFlow: Flow[HttpRequest, ByteString, NotUsed] =
    Flow[HttpRequest]
      .flatMapConcat(req ⇒
        Source.fromFuture(Http().singleRequest(req))
          .flatMapConcat { response ⇒
            response.entity.dataBytes
          }
      )

Thanks for digging into this @jrudolph, very helpful! Let me try to fold your suggestions back into the repro and production code and see where that lands us.

Do you think there's a deterministic way we can set the subscription timeout to ensure we won't hit it?

Perhaps answering my own question here... we should be able to set the subscription timeout to infinite or effectively infinite safely. There really shouldn't be some runtime scenario that causes us not to consume it and cause a leak, so we don't really need the timeout. Is that correct, or is there some system level issue where perhaps the system might fail in such a way as to cause us not to consume the body?

Given that, from a general usability standpoint, might it be a good idea to increase the default value for that timeout to something more like 1 minute instead of 1 second?

Coming back to this. I'm unsure if the subscription timeout causes a leak if the entity is not discarded. And what is the correct way to discard if a timeout happens? Should I try to recover in the HTTP stream?

@easel

Perhaps answering my own question here... we should be able to set the subscription timeout to infinite or effectively infinite safely. There really shouldn't be some runtime scenario that causes us not to consume it and cause a leak, so we don't really need the timeout. Is that correct, or is there some system level issue where perhaps the system might fail in such a way as to cause us not to consume the body?

I would be careful with it because it's somewhat easy to forget to subscribe to an entity. This is already a safe-guard that prevents that the shared pool is stalled by consumers not subscribing entities.

Given that, from a general usability standpoint, might it be a good idea to increase the default value for that timeout to something more like 1 minute instead of 1 second?

The solution would be to consume the entities timely in all cases. Increasing the timeout just means that your pool will be sitting around idly waiting for your code to consume responses on the shared connections.

@trautonen

Coming back to this. I'm unsure if the subscription timeout causes a leak if the entity is not discarded. And what is the correct way to discard if a timeout happens? Should I try to recover in the HTTP stream?

No, maybe the error message is unclear? It means that the pool has noticed that the user code hasn't consumed the response entity (which might mean that part of the entity hasn't yet been streamed out from the server-side at all) and that this response is blocking a persistent HTTP from the pool from doing actual work. What the pool does in this case is to abort this response and to close the connection (to be reopened for new requests). If the user code decides to subscribe to the response too late, it will get an error message.

@jrudolph where I've netted out is that it is simply not best practice to mix Akka http into a larger processing stream. Instead, you need a boundary around the Akka http parts of the stream that ensures they always consume their response before allowing the outer processing stream to proceed.

At first glance this is a little counter-intuitive, as one might think it would be great to just connect everything together and let back pressure work its magic. It's also counter-intuitive in that you'll have to create a lot of short-lived flows, which "feels" like it might be heavy and suboptimal. I don't have numbers to support either side of that argument, aside from the test cases at https://github.com/easel/akka-http-examples/blob/master/src/test/scala/io/github/easel/RecursiveAkkaHttpSpec.scala which show that it's necessary.

My guess is there's a documentation improvement to be had here, but to be honest I'm not sure where. If somebody has an idea about that, I'd be willing to give a crack at writing it.

I went ahead and put together some docs and examples in #2270. I went with the mapAsync() route since IIRC that benchmarked out the fastest for me, but I'm happy to adjust it for whatever the Akka team thinks is the best practice. Also, I don't really know the Akka java api so there may be some issues there as well.

Feedback welcome.

Hi, same problem here updating akka-http from 10.0.13 to 10.1.5 due to the new client pool.

In my case, I'm using the superPool flow-based variant. The flow returned by superPool is processed as akka streams. In fact, all the process is designed to be used in a streaming fashion, consuming the responses (or using discardEntityBytes). However, with the new client pool I'm getting the warn message Response entity was not subscribed after 1 second (...) and the exception cannot be materialized more than once. Works fine with the pool set to legacy or increasing the response-entity-subscription-timeout (but if there is not a deterministic way we can set this timeout...).

I'm wondering if the examples should include other use cases like superPool or cachedHostConnectionPool.

Let me show some code with the problem:

          Source(1 to 1000) // just for illustration purpose only
            .map { r =>
              HttpRequest(
                method = HttpMethods.GET,
                uri = "https://kafka.apache.org/documentation/"
              ) -> r
            }
            .via(Http().superPool[Int]())
            .mapAsync(1) {
              case (Success(response), _) =>
                response.entity.dataBytes
                  .runFold("") {
                    case (string, bs) => string + bs.compact.utf8String
                  } // imagine a json response or something like that
              /*.map {
                 // response processing here, for example, send to kafka...
               }*/
              case (Failure(_), _) =>
                Future.successful(Done)
            }
            .runWith(Sink.ignore)

What is the solution in that case?

Hi @glammers1, I tried to address that in
https://github.com/akka/akka-http/pull/2270, take a look and see what you think. Fundamentally, you cannot mix a stream of requests with an Akka http "download one resource" into the same stream without risking timeouts.

Hi @easel, I had already seen the PR, however the examples are using the singleRequest approach, I continue to have doubts about the solution in your test cases that fails due to timeout in which you use the superPool like me in my comment above. Will you update the repo with the tests fixed?

About #2270 I am a little confused. I will always refer to that example. What is the hint to distinguish between what really can be done in the parse method and in the processorFlow? Could "parse" action be part of processorFlow or vice versa? That is, parts of processorFlow would be done in the "parse" step. Could these changes cause the timeout again?

I've done more tests about this issue and I would like to add here.

First of all, the WARN message:

[WARN] [11/15/2018 10:45:15.733] [test-akka.actor.default-dispatcher-21] [test/Pool(shared->https://kafka.apache.org:443)] [0 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it. GET /documentation/ Empty -> 200 OK Chunked
  • singleRequest using the first @jrudolph approach. Warn message appears and the stream "is stopped".
Source(1 to 1000) // just for illustration purpose
        .map { r =>
          HttpRequest(
            uri = "https://kafka.apache.org/documentation/"
          )
        }
        .mapAsync(1)(r => Http().singleRequest(r))
        .flatMapConcat {
          response =>
            response.entity.dataBytes
        }
        .runFold("") {
          case (string, bs) => string + bs.compact.utf8String
        }
  • singleRequest using the second jrudolph approach. Everything seems to be working correctly in all the executions I've done (I am not quite sure).
Source(1 to 1000) // just for illustration purpose
        .map { r =>
          HttpRequest(
            uri = "https://kafka.apache.org/documentation/"
          )
        }
        .flatMapConcat(r =>
          Source
            .fromFuture(Http().singleRequest(r))
            .flatMapConcat { response =>
              response.entity.dataBytes
          })
        .runFold("") {
          case (string, bs) => string + bs.compact.utf8String
        }
  • In other cases (superPool and cachedHostConnectionPool (code is very similar to the superPool)) the warn message appears and the stream "is stopped".

In #57 @jrudolph posted:

The recommendation is to use Http.singleRequest and to make sure to read the entity data. This usually fixes almost all of the problems

And @ymeymann said:

We already make sure that entities are consumed, but that does not solve the problem

we backed down to using singleRequest in the sites that were problematic.

jrudolph responds:

What did you use before? singleRequest is using the same pool as cacheHostConnectionPool or superPool.

At this point, I don't know how to decide which to use without change the subscription timeout. I am concerned that apparently the singleRequest approach only works without the timeout when all requests run strictly sequentially (otherwise, "this will run at least one other request concurrently because of internal buffering").

Additionally, I'm wondering if all these problems depend on the type of the response (HttpEntity.Default, HttpEntity.Chunked...) and I could have one implementation for an endpoint with known non-zero length working correctly but the same implementation failing for an endpoint that use Transfer-Encoding: chunked.

@glammers1 your first example isn't consuming the stream in a separate Akka HTTP request stream, which is why it doesn't work as per my documentation update in the PR. My hunch is your superpool implementation has the same problem.

Added some more text to the docs indicating what can happen if you don't properly consume the stream, per discussion in #1836. This should help search engines pick up on the docs when people search for Response entity was not subscribed after 1 second.

Was this page helpful?
0 / 5 - 0 ratings