Akka-http: Graph completes normally without consuming full response body

Created on 5 Mar 2018  Â·  6Comments  Â·  Source: akka/akka-http

Context

In our project akka-http is used to download big json files and parse it in a streaming way.
As files can be big we support resume of download on server side with byte ranges headers.

Our typical flow looks like:

  request().map { response =>
    response.entity.withoutSizeLimit().dataBytes.recoverWithRetries(-1, {
      case e: Throwable =>
        Source.fromFuture(request()
          .map(_.entity.withoutSizeLimit().dataBytes))
          .flatMapConcat(identity)
    })
  }.flatMap { source =>
    source.via(new JsonParsingStage[ByteString])
      .runWith(Sink.ignore)
  }

Recently we tried to process response of 40 GBs and our processing flow start failing with an error, when onUpstreamFinish is called in our json parsing stage resulting json chuck is invalid.
We checked the data which was sent to json parser and indeed it was incomplete, according to server access logs on client process never reached end of stream and usually fails after 15 - 20GBs of downloaded data.

I enable debug logging for akka stream stages and tls actor:
akka-trace.log.zip
If it will be useful i can provide longer trace.

Findings

  1. Failure normally happens after long run and many (2-3+) resumes. Resumes triggered usually by entity truncation stream and too big http chunk.
  2. Somehow after TLS actor stops graph completes normally.
6864567A PUSH BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=10/16, completed=false, canceled=false) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 50, 52, 54, 49, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 57, 65, 53, 56, 67, 50, 54, 53, 65, 48, 55, 50, 56, 67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 49, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58)... and [16284] more) (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda]
6864567A PUSH BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=9/16, completed=false, canceled=false) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 49, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 51, 54, 44, 34, 99, 104, 101, 99, 107, 115, 117, 109, 34, 58, 34, 69, 70, 56, 56, 68, 67, 49, 65, 67, 55, 57, 50, 70, 55, 68, 51, 50, 51, 56, 68, 69, 48, 69, 48, 53, 56, 68, 70, 57, 50, 70, 55, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34, 112)... and [16284] more) (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda]
6864567A PUSH BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=8/16, completed=false, canceled=false) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(44, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 50, 54, 53, 53, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 57, 65, 53, 56, 67, 50, 54, 53, 65, 48, 55, 50, 56, 67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 49, 34, 44, 34, 100, 97, 116, 97, 83)... and [16284] more) (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda]
6864567A PUSH akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda -> akka.stream.impl.fusing.Collect$$anon$2@581f8f6, SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(44, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 50, 54, 53, 53, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 57, 65, 53, 56, 67, 50, 54, 53, 65, 48, 55, 50, 56, 67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 49, 34, 44, 34, 100, 97, 116, 97, 83)... and [16284] more) (akka.stream.impl.fusing.Collect$$anon$2@581f8f6) [akka.stream.impl.fusing.Collect$$anon$2@581f8f6]
6864567A onNext SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(34, 58, 34, 51, 48, 48, 50, 55, 52, 52, 57, 48, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 49, 68, 70, 68, 57, 52, 51, 65, 69, 69, 67, 65, 53, 70, 51, 52, 54, 55, 51, 66, 49, 70, 55, 55, 57, 52, 50, 51, 50, 67, 49, 48, 46, 50, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 49, 55, 51, 44, 34, 99)... and [16284] more) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
6864567A onNext SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(56, 67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 50, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 51, 54, 44, 34, 99, 104, 101, 99, 107, 115, 117, 109, 34, 58, 34, 69, 70, 56, 56, 68, 67, 49, 65, 67, 55, 57, 50, 70, 55, 68, 51, 50, 51, 56, 68, 69, 48, 69, 48, 53, 56, 68, 70, 57, 50, 70, 55, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34)... and [16284] more) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
6864567A onNext SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 55, 49, 57, 68, 55, 56, 57, 69, 56, 55, 68, 50, 69, 68, 57, 57, 53, 68, 48, 68, 52, 55, 70, 67, 65, 70, 68, 67, 49, 65, 57, 68, 46, 50, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 50, 49, 49, 44, 34, 99, 104, 101, 99, 107, 115, 117, 109, 34, 58, 34, 55, 57, 49, 53, 55, 50, 49, 52, 66, 69, 65, 65, 69, 66, 51, 68, 51, 65)... and [16284] more) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
....
6864567A onNext SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(44, 34, 99, 104, 101, 99, 107, 115, 117, 109, 34, 58, 34, 51, 67, 66, 54, 50, 55, 51, 51, 69, 50, 55, 54, 67, 50, 65, 67, 67, 55, 65, 54, 53, 66, 54, 52, 70, 67, 65, 53, 49, 67, 69, 54, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 44, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 53, 52, 50, 54, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98)... and [16284] more) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
6864567A onNext SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(34, 51, 48, 48, 50, 55, 53, 53, 50, 51, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 57, 65, 53, 56, 67, 50, 54, 53, 65, 48, 55, 50, 56, 67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 49, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 51, 54, 44, 34, 99, 104, 101, 99)... and [16284] more) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
6864567A onNext SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(56, 67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 49, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 51, 54, 44, 34, 99, 104, 101, 99, 107, 115, 117, 109, 34, 58, 34, 69, 70, 56, 56, 68, 67, 49, 65, 67, 55, 57, 50, 70, 55, 68, 51, 50, 51, 56, 68, 69, 48, 69, 48, 53, 56, 68, 70, 57, 50, 70, 55, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 44, 34)... and [16284] more) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
6864567A onNext SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(67, 69, 54, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 53, 55, 48, 49, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 67, 53, 57, 55, 55, 49, 48, 51, 67, 67, 69, 57, 54, 52, 50, 52, 70, 70, 49, 69, 48, 52, 55)... and [10632] more) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
6864567A  onComplete port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
6864567A PUSH BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=11/16, completed=true, canceled=false) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(34, 58, 34, 51, 48, 48, 50, 55, 52, 52, 57, 48, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 49, 68, 70, 68, 57, 52, 51, 65, 69, 69, 67, 65, 53, 70, 51, 52, 54, 55, 51, 66, 49, 70, 55, 55, 57, 52, 50, 51, 50, 67, 49, 48, 46, 50, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 49, 55, 51, 44, 34, 99)... and [16284] more) (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda]
6864567A PUSH BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=10/16, completed=true, canceled=false) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(56, 67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 50, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 51, 54, 44, 34, 99, 104, 101, 99, 107, 115, 117, 109, 34, 58, 34, 69, 70, 56, 56, 68, 67, 49, 65, 67, 55, 57, 50, 70, 55, 68, 51, 50, 51, 56, 68, 69, 48, 69, 48, 53, 56, 68, 70, 57, 50, 70, 55, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34)... and [16284] more) (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda]
6864567A PUSH BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=6/16, completed=true, canceled=false) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 57, 65, 53, 56, 67, 50, 54, 53, 65, 48, 55, 50, 56, 67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 50, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 51, 54, 44, 34, 99, 104, 101, 99, 107, 115, 117, 109, 34, 58, 34, 69, 70, 56, 56, 68, 67, 49, 65, 67, 55, 57, 50, 70, 55, 68, 51, 50, 51, 56, 68, 69, 48, 69, 48, 53, 56, 68, 70)... and [16284] more) (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda]
6864567A PUSH BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=2/16, completed=true, canceled=false) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(34, 51, 48, 48, 50, 55, 53, 53, 50, 51, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 57, 65, 53, 56, 67, 50, 54, 53, 65, 48, 55, 50, 56, 67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 49, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 51, 54, 44, 34, 99, 104, 101, 99)... and [16284] more) (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda]
6864567A PUSH BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=1/16, completed=true, canceled=false) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(56, 67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 49, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 51, 54, 44, 34, 99, 104, 101, 99, 107, 115, 117, 109, 34, 58, 34, 69, 70, 56, 56, 68, 67, 49, 65, 67, 55, 57, 50, 70, 55, 68, 51, 50, 51, 56, 68, 69, 48, 69, 48, 53, 56, 68, 70, 57, 50, 70, 55, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 44, 34)... and [16284] more) (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda]
6864567A   complete(Connection(19, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=0/16, completed=true, canceled=false), akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=0/16, completed=true, canceled=false), 4, SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(67, 69, 54, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 53, 55, 48, 49, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 67, 53, 57, 55, 55, 49, 48, 51, 67, 67, 69, 57, 54, 52, 50, 52, 70, 70, 49, 69, 48, 52, 55)... and [10632] more))) [4]
6864567A PUSH BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=0/16, completed=true, canceled=false) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(67, 69, 54, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 53, 55, 48, 49, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 67, 53, 57, 55, 55, 49, 48, 51, 67, 67, 69, 57, 54, 52, 50, 52, 70, 70, 49, 69, 48, 52, 55)... and [10632] more) (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda]
6864567A PUSH akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda -> akka.stream.impl.fusing.Collect$$anon$2@581f8f6, SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(67, 69, 54, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 53, 55, 48, 49, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 67, 53, 57, 55, 55, 49, 48, 51, 67, 67, 69, 57, 54, 52, 50, 52, 70, 70, 49, 69, 48, 52, 55)... and [10632] more) (akka.stream.impl.fusing.Collect$$anon$2@581f8f6) [akka.stream.impl.fusing.Collect$$anon$2@581f8f6]
6864567A COMPLETE BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=0/16, completed=true, canceled=false) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda]
6864567A   cancel(Connection(19, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=0/16, completed=true, canceled=false), akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, fill=0/16, completed=true, canceled=false), 49, Empty)) [49]
6864567A   complete(Connection(12, akka.stream.impl.fusing.Collect$$anon$2@581f8f6, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, akka.stream.impl.fusing.Collect$$anon$2@581f8f6, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, 1, Empty)) [1]
6864567A COMPLETE akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda -> akka.stream.impl.fusing.Collect$$anon$2@581f8f6 (akka.stream.impl.fusing.Collect$$anon$2@581f8f6) [akka.stream.impl.fusing.Collect$$anon$2@581f8f6]
6864567A   cancel(Connection(12, akka.stream.impl.fusing.Collect$$anon$2@581f8f6, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, akka.stream.impl.fusing.Collect$$anon$2@581f8f6, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda, 49, Empty)) [49]

Complete on 6864567A onComplete port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda is triggered by:

2018-03-05 14:30:26,070 unwrap: status=OK handshake=NOT_HANDSHAKING remaining=2300 out=16384
2018-03-05 14:30:26,071 unwrap: status=BUFFER_UNDERFLOW handshake=NOT_HANDSHAKING remaining=2300 out=16384
2018-03-05 14:30:26,072 flushToUser
2018-03-05 14:30:26,073 bidirectional continue
6864567A onNext SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(44, 34, 99, 104, 101, 99, 107, 115, 117, 109, 34, 58, 34, 51, 67, 66, 54, 50, 55, 51, 51, 69, 50, 55, 54, 67, 50, 65, 67, 67, 55, 65, 54, 53, 66, 54, 52, 70, 67, 65, 53, 49, 67, 69, 54, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 44, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 53, 52, 50, 54, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98)... and [16284] more) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
2018-03-05 14:30:26,075 bidirectional
2018-03-05 14:30:26,075 chopping from old chunk of 41318 into TransportIn (2300)
2018-03-05 14:30:26,075 unwrap: status=OK handshake=NOT_HANDSHAKING remaining=2300 out=16384
2018-03-05 14:30:26,075 unwrap: status=BUFFER_UNDERFLOW handshake=NOT_HANDSHAKING remaining=2300 out=16384
2018-03-05 14:30:26,075 flushToUser
2018-03-05 14:30:26,075 bidirectional continue
2018-03-05 14:30:26,075 bidirectional
2018-03-05 14:30:26,075 chopping from old chunk of 24905 into TransportIn (2300)
2018-03-05 14:30:26,075 unwrap: status=OK handshake=NOT_HANDSHAKING remaining=2300 out=16384
2018-03-05 14:30:26,075 unwrap: status=BUFFER_UNDERFLOW handshake=NOT_HANDSHAKING remaining=2300 out=16384
2018-03-05 14:30:26,075 flushToUser
2018-03-05 14:30:26,075 bidirectional continue
2018-03-05 14:30:26,075 bidirectional
2018-03-05 14:30:26,075 chopping from old chunk of 8492 into TransportIn (2300)
2018-03-05 14:30:26,075 unwrap: status=OK handshake=NOT_HANDSHAKING remaining=31 out=10732
2018-03-05 14:30:26,075 unwrap: status=CLOSED handshake=NEED_WRAP remaining=0 out=10732
2018-03-05 14:30:26,075 flushToUser
2018-03-05 14:30:26,075 bidirectional continue
2018-03-05 14:30:26,075 wrap: status=CLOSED handshake=NOT_HANDSHAKING remaining=0 out=31
2018-03-05 14:30:26,075 flushToTransport
2018-03-05 14:30:26,075 sending 31 bytes
2018-03-05 14:30:26,075 STOP Outbound Closed: true Inbound closed: true
2018-03-05 14:30:26,075 awaitingClose
2018-03-05 14:30:26,075 chopping from new chunk of 1440 into TransportIn (12120)
2018-03-05 14:30:26,075 unwrap: status=BUFFER_UNDERFLOW handshake=NEED_UNWRAP remaining=13560 out=0
2018-03-05 14:30:26,075 flushToUser
2018-03-05 14:30:26,075 postStop
6864567A ---------------- EXECUTE (32, 424850, 424850)() (running=19, shutdown=0,0,0,0,2,2,2,2,2,2,2,2,0,2,2,2,2,1,1,0,1(KeepGoing),2,0,2,0,0,2,1,0)
2018-03-05 14:30:26,075 awaitingClose
6864567A ---------------- (32, 424850, 424850)() (running=19, shutdown=0,0,0,0,2,2,2,2,2,2,2,2,0,2,2,2,2,1,1,0,1(KeepGoing),2,0,2,0,0,2,1,0)
6864567A onNext SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(34, 51, 48, 48, 50, 55, 53, 53, 50, 51, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 57, 65, 53, 56, 67, 50, 54, 53, 65, 48, 55, 50, 56, 67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 49, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 51, 54, 44, 34, 99, 104, 101, 99)... and [16284] more) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
6864567A ---------------- EXECUTE (32, 424850, 424850)() (running=19, shutdown=0,0,0,0,2,2,2,2,2,2,2,2,0,2,2,2,2,1,1,0,1(KeepGoing),2,0,2,0,0,2,1,0)
2018-03-05 14:30:26,076 chopping from new chunk of 23040 into TransportIn (13560)
2018-03-05 14:30:26,076 unwrap: status=OK handshake=NEED_UNWRAP remaining=2300 out=0
6864567A ---------------- (32, 424850, 424850)() (running=19, shutdown=0,0,0,0,2,2,2,2,2,2,2,2,0,2,2,2,2,1,1,0,1(KeepGoing),2,0,2,0,0,2,1,0)
2018-03-05 14:30:26,076 unwrap: status=BUFFER_UNDERFLOW handshake=NEED_UNWRAP remaining=2300 out=0
2018-03-05 14:30:26,076 flushToUser
2018-03-05 14:30:26,076 awaitingClose
6864567A onNext SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(56, 67, 68, 66, 57, 65, 68, 55, 57, 68, 57, 67, 53, 66, 55, 51, 50, 69, 50, 57, 46, 49, 34, 44, 34, 100, 97, 116, 97, 83, 105, 122, 101, 34, 58, 51, 54, 44, 34, 99, 104, 101, 99, 107, 115, 117, 109, 34, 58, 34, 69, 70, 56, 56, 68, 67, 49, 65, 67, 55, 57, 50, 70, 55, 68, 51, 50, 51, 56, 68, 69, 48, 69, 48, 53, 56, 68, 70, 57, 50, 70, 55, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 44, 34)... and [16284] more) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
2018-03-05 14:30:26,076 chopping from old chunk of 17887 into TransportIn (2300)
6864567A ---------------- EXECUTE (32, 424850, 424850)() (running=19, shutdown=0,0,0,0,2,2,2,2,2,2,2,2,0,2,2,2,2,1,1,0,1(KeepGoing),2,0,2,0,0,2,1,0)
6864567A ---------------- (32, 424850, 424850)() (running=19, shutdown=0,0,0,0,2,2,2,2,2,2,2,2,0,2,2,2,2,1,1,0,1(KeepGoing),2,0,2,0,0,2,1,0)
6864567A onNext SessionBytes([Session-3, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256],ByteString(67, 69, 54, 34, 125, 10, 44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 53, 55, 48, 49, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 67, 53, 57, 55, 55, 49, 48, 51, 67, 67, 69, 57, 54, 52, 50, 52, 70, 70, 49, 69, 48, 52, 55)... and [10632] more) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
2018-03-05 14:30:26,076 unwrap: status=OK handshake=NEED_UNWRAP remaining=2300 out=0
6864567A ---------------- EXECUTE (32, 424850, 424850)() (running=19, shutdown=0,0,0,0,2,2,2,2,2,2,2,2,0,2,2,2,2,1,1,0,1(KeepGoing),2,0,2,0,0,2,1,0)
2018-03-05 14:30:26,076 unwrap: status=BUFFER_UNDERFLOW handshake=NEED_UNWRAP remaining=2300 out=0
6864567A ---------------- (32, 424850, 424850)() (running=19, shutdown=0,0,0,0,2,2,2,2,2,2,2,2,0,2,2,2,2,1,1,0,1(KeepGoing),2,0,2,0,0,2,1,0)
2018-03-05 14:30:26,076 flushToUser
2018-03-05 14:30:26,076 awaitingClose
2018-03-05 14:30:26,076 chopping from old chunk of 1474 into TransportIn (2300)
2018-03-05 14:30:26,076 unwrap: status=BUFFER_UNDERFLOW handshake=NEED_UNWRAP remaining=3774 out=0
2018-03-05 14:30:26,076 flushToUser
6864567A onNext ByteString(21, 3, 3, 0, 26, 0, 0, 0, 0, 0, 0, 0, 2, 117, -118, 103, 92, 11, -99, 94, -98, -112, -78, -110, 38, 64, 124, -81, -108, 0, 64) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6
2018-03-05 14:30:26,076 awaitingClose
6864567A ---------------- EXECUTE (32, 424850, 424851)(Connection(27, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=false, canceled=false), akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=false, canceled=false), 4, ByteString(21, 3, 3, 0, 26, 0, 0, 0, 0, 0, 0, 0, 2, 117, -118, 103, 92, 11, -99, 94, -98, -112, -78, -110, 38, 64, 124, -81, -108, 0, 64))) (running=19, shutdown=0,0,0,0,2,2,2,2,2,2,2,2,0,2,2,2,2,1,1,0,1(KeepGoing),2,0,2,0,0,2,1,0)
6864567A PUSH BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=false, canceled=false) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, ByteString(21, 3, 3, 0, 26, 0, 0, 0, 0, 0, 0, 0, 2, 117, -118, 103, 92, 11, -99, 94, -98, -112, -78, -110, 38, 64, 124, -81, -108, 0, 64) (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6]
2018-03-05 14:30:26,077 chopping from new chunk of 7200 into TransportIn (3774)
6864567A PUSH akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6 -> akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7@4b280175, ByteString(21, 3, 3, 0, 26, 0, 0, 0, 0, 0, 0, 0, 2, 117, -118, 103, 92, 11, -99, 94, -98, -112, -78, -110, 38, 64, 124, -81, -108, 0, 64) (akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7$IdleBidiHandler@45089459) [akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7@4b280175]
6864567A PUSH akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7@4b280175 -> akka.stream.impl.fusing.MapError$$anon$13@725cda94, ByteString(21, 3, 3, 0, 26, 0, 0, 0, 0, 0, 0, 0, 2, 117, -118, 103, 92, 11, -99, 94, -98, -112, -78, -110, 38, 64, 124, -81, -108, 0, 64) (akka.stream.impl.fusing.MapError$$anon$13@725cda94) [akka.stream.impl.fusing.MapError$$anon$13@725cda94]
6864567A PUSH akka.stream.impl.fusing.MapError$$anon$13@725cda94 -> akka.stream.impl.io.TcpConnectionStage$TcpStreamLogic@574c8132, ByteString(21, 3, 3, 0, 26, 0, 0, 0, 0, 0, 0, 0, 2, 117, -118, 103, 92, 11, -99, 94, -98, -112, -78, -110, 38, 64, 124, -81, -108, 0, 64) (akka.stream.impl.io.TcpConnectionStage$TcpStreamLogic$$anon$6@513004d) [akka.stream.impl.io.TcpConnectionStage$TcpStreamLogic@574c8132]
2018-03-05 14:30:26,077 unwrap: status=BUFFER_UNDERFLOW handshake=NEED_UNWRAP remaining=10974 out=0
6864567A PULL akka.stream.impl.io.TcpConnectionStage$TcpStreamLogic@574c8132 -> akka.stream.impl.fusing.MapError$$anon$13@725cda94 (akka.stream.impl.fusing.MapError$$anon$13@725cda94) [akka.stream.impl.fusing.MapError$$anon$13@725cda94]
6864567A PULL akka.stream.impl.fusing.MapError$$anon$13@725cda94 -> akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7@4b280175 (akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7$IdleBidiHandler@45089459) [akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7@4b280175]
6864567A PULL akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7@4b280175 -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6 (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6]
2018-03-05 14:30:26,077 flushToUser
6864567A PULL akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6 -> BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=false, canceled=false) (BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=false, canceled=false)) [BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=false, canceled=false)]
6864567A ---------------- (32, 424851, 424851)() (running=19, shutdown=0,0,0,0,2,2,2,2,2,2,2,2,0,2,2,2,2,1,1,0,1(KeepGoing),2,0,2,0,0,2,1,0)
2018-03-05 14:30:26,077 awaitingClose
6864567A  cancel port=GraphStages$Identity$.out(672764020)
2018-03-05 14:30:26,077 chopping from new chunk of 1440 into TransportIn (10974)
2018-03-05 14:30:26,077 unwrap: status=BUFFER_UNDERFLOW handshake=NEED_UNWRAP remaining=12414 out=0
2018-03-05 14:30:26,077 flushToUser
6864567A   cancel(Connection(29, ActorOutputBoundary(port=GraphStages$Identity$.out(672764020), demand=15, finished=true), akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b, ActorOutputBoundary(port=GraphStages$Identity$.out(672764020), demand=15, finished=true), akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b, 8, Empty)) [8]
6864567A ---------------- EXECUTE (32, 424851, 424852)(Connection(29, ActorOutputBoundary(port=GraphStages$Identity$.out(672764020), demand=15, finished=true), akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b, ActorOutputBoundary(port=GraphStages$Identity$.out(672764020), demand=15, finished=true), akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b, 24, Empty)) (running=18, shutdown=0,0,0,0,2,2,2,2,2,2,2,2,0,2,2,2,2,0,1,0,1(KeepGoing),2,0,2,0,0,2,1,0)
6864567A CANCEL ActorOutputBoundary(port=GraphStages$Identity$.out(672764020), demand=15, finished=true) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b]
6864567A   cancel(Connection(18, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b, akka.stream.impl.fusing.Map$$anon$9@35e44a4a, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b, akka.stream.impl.fusing.Map$$anon$9@35e44a4a, 8, Empty)) [8]
6864567A   complete(Connection(29, ActorOutputBoundary(port=GraphStages$Identity$.out(672764020), demand=15, finished=true), akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b, ActorOutputBoundary(port=GraphStages$Identity$.out(672764020), demand=15, finished=true), akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b, 56, Empty)) [56]
6864567A CANCEL akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b -> akka.stream.impl.fusing.Map$$anon$9@35e44a4a (akka.stream.impl.fusing.Map$$anon$9@35e44a4a) [akka.stream.impl.fusing.Map$$anon$9@35e44a4a]
6864567A   cancel(Connection(11, akka.stream.impl.fusing.Map$$anon$9@35e44a4a, akka.stream.impl.fusing.MapError$$anon$13@464f99c9, akka.stream.impl.fusing.Map$$anon$9@35e44a4a, akka.stream.impl.fusing.MapError$$anon$13@464f99c9, 8, Empty)) [8]
6864567A   complete(Connection(18, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b, akka.stream.impl.fusing.Map$$anon$9@35e44a4a, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@657d7a6b, akka.stream.impl.fusing.Map$$anon$9@35e44a4a, 56, Empty)) [56]
6864567A CANCEL akka.stream.impl.fusing.Map$$anon$9@35e44a4a -> akka.stream.impl.fusing.MapError$$anon$13@464f99c9 (akka.stream.impl.fusing.MapError$$anon$13@464f99c9) [akka.stream.impl.fusing.MapError$$anon$13@464f99c9]
6864567A   cancel(Connection(10, akka.stream.impl.fusing.MapError$$anon$13@464f99c9, akka.stream.impl.fusing.FlattenMerge$$anon$5@7150858e, akka.stream.impl.fusing.MapError$$anon$13@464f99c9, akka.stream.impl.fusing.FlattenMerge$$anon$5$$anon$6@14ac2bc6, 8, Empty)) [8]
6864567A   complete(Connection(11, akka.stream.impl.fusing.Map$$anon$9@35e44a4a, akka.stream.impl.fusing.MapError$$anon$13@464f99c9, akka.stream.impl.fusing.Map$$anon$9@35e44a4a, akka.stream.impl.fusing.MapError$$anon$13@464f99c9, 56, Empty)) [56]
6864567A CANCEL akka.stream.impl.fusing.MapError$$anon$13@464f99c9 -> akka.stream.impl.fusing.FlattenMerge$$anon$5@7150858e (akka.stream.impl.fusing.FlattenMerge$$anon$5$$anon$6@14ac2bc6) [akka.stream.impl.fusing.FlattenMerge$$anon$5@7150858e]
6864567A   cancel(Connection(13, akka.stream.impl.fusing.FlattenMerge$$anon$5@7150858e, akka.stream.impl.fusing.Map$$anon$9@fdf0cee, akka.stream.impl.fusing.FlattenMerge$$anon$5$$anon$7@7ad6846e, akka.stream.impl.fusing.Map$$anon$9@fdf0cee, 8, Empty)) [8]
6864567A   complete(Connection(10, akka.stream.impl.fusing.MapError$$anon$13@464f99c9, akka.stream.impl.fusing.FlattenMerge$$anon$5@7150858e, akka.stream.impl.fusing.MapError$$anon$13@464f99c9, akka.stream.impl.fusing.FlattenMerge$$anon$5$$anon$6@14ac2bc6, 56, Empty)) [56]
6864567A CANCEL akka.stream.impl.fusing.FlattenMerge$$anon$5@7150858e -> akka.stream.impl.fusing.Map$$anon$9@fdf0cee (akka.stream.impl.fusing.Map$$anon$9@fdf0cee) [akka.stream.impl.fusing.Map$$anon$9@fdf0cee]
6864567A   cancel(Connection(14, akka.stream.impl.fusing.Map$$anon$9@fdf0cee, akka.http.impl.engine.client.OutgoingConnectionBlueprint$TerminationMerge$$anon$4@34f2ad1d, akka.stream.impl.fusing.Map$$anon$9@fdf0cee, EagerTerminateOutput, 8, Empty)) [8]
6864567A   complete(Connection(13, akka.stream.impl.fusing.FlattenMerge$$anon$5@7150858e, akka.stream.impl.fusing.Map$$anon$9@fdf0cee, akka.stream.impl.fusing.FlattenMerge$$anon$5$$anon$7@7ad6846e, akka.stream.impl.fusing.Map$$anon$9@fdf0cee, 56, Empty)) [56]
6864567A CANCEL akka.stream.impl.fusing.Map$$anon$9@fdf0cee -> akka.http.impl.engine.client.OutgoingConnectionBlueprint$TerminationMerge$$anon$4@34f2ad1d (EagerTerminateOutput) [akka.http.impl.engine.client.OutgoingConnectionBlueprint$TerminationMerge$$anon$4@34f2ad1d]
6864567A   cancel(Connection(5, akka.http.impl.engine.client.OutgoingConnectionBlueprint$TerminationMerge$$anon$4@34f2ad1d, akka.stream.scaladsl.Broadcast$$anon$5@52215c70, <function0>, akka.stream.scaladsl.Broadcast$$anon$5$$anon$17@2f40394, 56, Empty)) [56]
6864567A   cancel(Connection(6, akka.http.impl.engine.client.OutgoingConnectionBlueprint$TerminationMerge$$anon$4@34f2ad1d, akka.stream.scaladsl.Broadcast$$anon$5@3036675, akka.http.impl.engine.client.OutgoingConnectionBlueprint$TerminationMerge$$anon$4$$anon$5@2238b94d, akka.stream.scaladsl.Broadcast$$anon$5$$anon$17@7aa2b3a6, 8, HttpResponse(206 Partial Content,List(Date: Mon, 05 Mar 2018 12:55:53 GMT, Connection: keep-alive, Server: nginx-clojure, Access-Control-Allow-Headers: Authorization, Origin, X-Requested-With, Content-Type, Accept, Accept-Encoding, Accept-Language, Host, Referer, User-Agent, Access-Control-Allow-Methods: GET, POST, PUT, DELETE, OPTIONS, Access-Control-Allow-Origin: *, X-Transaction-Id: 5bd883e4-5fa4-44f8-806a-84e965bb1393, Cache-Control: max-age=3600, Accept-Ranges: bytes, Content-Range: bytes 7310887619-*/*, Server: DataService/Metadata-0.1.2419),HttpEntity.Chunked(application/json),HttpProtocol(HTTP/1.1)))) [8]
6864567A   complete(Connection(14, akka.stream.impl.fusing.Map$$anon$9@fdf0cee, akka.http.impl.engine.client.OutgoingConnectionBlueprint$TerminationMerge$$anon$4@34f2ad1d, akka.stream.impl.fusing.Map$$anon$9@fdf0cee, EagerTerminateOutput, 56, Empty)) [56]
6864567A CANCEL akka.http.impl.engine.client.OutgoingConnectionBlueprint$TerminationMerge$$anon$4@34f2ad1d -> akka.stream.scaladsl.Broadcast$$anon$5@3036675 (akka.stream.scaladsl.Broadcast$$anon$5$$anon$17@7aa2b3a6) [akka.stream.scaladsl.Broadcast$$anon$5@3036675]
6864567A   cancel(Connection(9, akka.stream.scaladsl.Broadcast$$anon$5@3036675, akka.http.impl.engine.client.OutgoingConnectionBlueprint$PrepareResponse$$anon$1@7d23057f, akka.stream.scaladsl.Broadcast$$anon$5@3036675, akka.http.impl.engine.client.OutgoingConnectionBlueprint$PrepareResponse$$anon$1@7d23057f, 8, Empty)) [8]
6864567A   complete(Connection(6, akka.http.impl.engine.client.OutgoingConnectionBlueprint$TerminationMerge$$anon$4@34f2ad1d, akka.stream.scaladsl.Broadcast$$anon$5@3036675, akka.http.impl.engine.client.OutgoingConnectionBlueprint$TerminationMerge$$anon$4$$anon$5@2238b94d, akka.stream.scaladsl.Broadcast$$anon$5$$anon$17@7aa2b3a6, 56, Empty)) [56]
6864567A   complete(Connection(2, akka.http.impl.util.One2OneBidiFlow$One2OneBidi$$anon$1@62065070, akka.stream.scaladsl.Broadcast$$anon$5@3036675, akka.http.impl.util.One2OneBidiFlow$One2OneBidi$$anon$1$$anon$4@5ecd3ca2, akka.stream.scaladsl.Broadcast$$anon$5$$anon$17@194b4bbf, 56, Empty)) [56]
6864567A CANCEL akka.stream.scaladsl.Broadcast$$anon$5@3036675 -> akka.http.impl.engine.client.OutgoingConnectionBlueprint$PrepareResponse$$anon$1@7d23057f (akka.http.impl.engine.client.OutgoingConnectionBlueprint$PrepareResponse$$anon$1@7d23057f) [akka.http.impl.engine.client.OutgoingConnectionBlueprint$PrepareResponse$$anon$1@7d23057f]
6864567A ---------------- (32, 424859, 424859)() (running=11, shutdown=0,0,0,0,0,2,0,0,0,2,0,0,0,1,2,0,2,0,1,0,1(KeepGoing),2,0,2,0,0,2,1,0)
6864567A  onComplete port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6
6864567A   complete(Connection(27, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=true, canceled=false), akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=true, canceled=false), 8, Empty)) [8]
6864567A ---------------- EXECUTE (32, 424859, 424860)(Connection(27, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=true, canceled=false), akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=true, canceled=false), 40, Empty)) (running=10, shutdown=0,0,0,0,0,2,0,0,0,2,0,0,0,1,2,0,2,0,1,0,1(KeepGoing),2,0,2,0,0,2,0,0)
6864567A COMPLETE BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=true, canceled=false) -> akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6 (akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6) [akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6]
6864567A   cancel(Connection(27, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=true, canceled=false), akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, BatchingActorInputBoundary(forPort=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, fill=0/16, completed=true, canceled=false), 56, Empty)) [56]
6864567A   complete(Connection(25, akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7@4b280175, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7$IdleBidiHandler@45089459, akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6, 8, Empty)) [8]
6864567A COMPLETE akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6 -> akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7@4b280175 (akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7$IdleBidiHandler@45089459) [akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7@4b280175]
6864567A   complete(Connection(22, akka.stream.impl.fusing.MapError$$anon$13@725cda94, akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7@4b280175, akka.stream.impl.fusing.MapError$$anon$13@725cda94, akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7$IdleBidiHandler@45089459, 8, Empty)) [8]
6864567A COMPLETE akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7@4b280175 -> akka.stream.impl.fusing.MapError$$anon$13@725cda94 (akka.stream.impl.fusing.MapError$$anon$13@725cda94) [akka.stream.impl.fusing.MapError$$anon$13@725cda94]
6864567A   cancel(Connection(22, akka.stream.impl.fusing.MapError$$anon$13@725cda94, akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7@4b280175, akka.stream.impl.fusing.MapError$$anon$13@725cda94, akka.stream.impl.Timers$IdleTimeoutBidi$$anon$7$IdleBidiHandler@45089459, 56, Empty)) [56]
6864567A   complete(Connection(21, akka.stream.impl.io.TcpConnectionStage$TcpStreamLogic@574c8132, akka.stream.impl.fusing.MapError$$anon$13@725cda94, akka.stream.impl.io.TcpConnectionStage$TcpStreamLogic$$anon$6@513004d, akka.stream.impl.fusing.MapError$$anon$13@725cda94, 8, Empty)) [8]
6864567A COMPLETE akka.stream.impl.fusing.MapError$$anon$13@725cda94 -> akka.stream.impl.io.TcpConnectionStage$TcpStreamLogic@574c8132 (akka.stream.impl.io.TcpConnectionStage$TcpStreamLogic$$anon$6@513004d) [akka.stream.impl.io.TcpConnectionStage$TcpStreamLogic@574c8132]
6864567A ---------------- (32, 424863, 424863)() (running=7, shutdown=0,0,0,0,0,2,0,0,0,2,0,0,0,1,2,0,2,0,1,0,0(KeepGoing),0,0,0,0,0,0,0,0)
6864567A  onComplete port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@1ee2cdda
6864567A ---------------- EXECUTE (32, 424863, 424863)() (running=7, shutdown=0,0,0,0,0,2,0,0,0,2,0,0,0,1,2,0,2,0,1,0,0(KeepGoing),0,0,0,0,0,0,0,0)

I don`t know if i interpret the trace correctly but looks like after TLS actor is stopped, buffer of decoded session bytes is drained and later graph completes normally.

Versions

Issue is reproducible with Akka 2.4.9 + Akka-http 10.0.10 and with latest akka 2.5.11+akka-http 10.1.0-RC2

bug

Most helpful comment

@jrudolph I was finally able to get to the bottom of the issue.

What is happening:

  1. drainParser collect parser output and register handler on NeedMoreData to getNextData in ResponseParsingMerge stage

  2. later dataIn is finished (sessionBytes input due to ssl error) and onUpstreamFinish is called, responseOut is not available so EmittingSingle handler is registered.

  3. on next pull getNextData complete the stage immediately as dataIn is closed. Emit single handler registered on responseOut in step2 is actually lost.
    https://github.com/akka/akka-http/blob/59e5fdb24c6b6457b9b3156abfa652f32a4fd4b1/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala#L335-L339

Here is a reproducer:

package akka.http.impl.engine.client

import akka.http.impl.engine.client.OutgoingConnectionBlueprint.ResponseParsingMerge
import akka.http.impl.engine.parsing.HttpResponseParser.ResponseContext
import akka.http.impl.engine.parsing.ParserOutput.EntityStreamError
import akka.http.impl.engine.parsing.{HttpHeaderParser, HttpResponseParser, ParserOutput}
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.ParserSettings
import akka.stream.TLSProtocol.SessionBytes
import akka.stream.javadsl.RunnableGraph
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.stream.testkit.{TestPublisher, TestSubscriber}
import akka.stream.{ActorMaterializer, Attributes, ClosedShape}
import akka.testkit.AkkaSpec
import akka.util.ByteString

class ResponseParsingMergeSpec extends AkkaSpec {

  val parserSettings = ParserSettings(system)

  "The ResponseParsingMerge stage" should {

    "not lose entity truncation messages" in {
      implicit val mat = ActorMaterializer()

      val inBypassProbe = TestPublisher.manualProbe[OutgoingConnectionBlueprint.BypassData]()
      val inSessionBytesProbe = TestPublisher.manualProbe[SessionBytes]()
      val responseProbe = TestSubscriber.manualProbe[List[ParserOutput.ResponseOutput]]

      val responseParsingMerge: ResponseParsingMerge = {
        val rootParser = new HttpResponseParser(parserSettings, HttpHeaderParser(parserSettings, log))
        new ResponseParsingMerge(rootParser)
      }

      RunnableGraph.fromGraph(
        GraphDSL.create(){ implicit b ⇒
          import GraphDSL.Implicits._
          val parsingMerge = b.add(responseParsingMerge)

          Source.fromPublisher(inBypassProbe) ~> parsingMerge.in1
          Source.fromPublisher(inSessionBytesProbe) ~> parsingMerge.in0
          parsingMerge.out ~> Sink.fromSubscriber(responseProbe)

          ClosedShape
        }.withAttributes(Attributes.inputBuffer(1, 1))
      ).run(mat)

      val inSessionBytesSub = inSessionBytesProbe.expectSubscription()
      val inBypassSub = inBypassProbe.expectSubscription()
      val responseSub = responseProbe.expectSubscription()

      responseSub.request(1)
      inSessionBytesSub.expectRequest(1)

      inBypassSub.sendNext(ResponseContext(HttpMethods.GET, None))
      inSessionBytesSub.sendNext(SessionBytes(null, ByteString(
        """HTTP/1.1 200 OK
        |Transfer-Encoding: chunked
        |Connection: lalelu
        |Content-Type: application/pdf
        |Server: spray-can
        |
        |3
        |abc
        |10
        |0123456789ABCDEF
        |3
        |abc
        |10
        |0123456789ABCDEF""".stripMargin
      )))

      inSessionBytesSub.sendNext(SessionBytes(null, ByteString(
        """
        |3
        |abc
        |10
        |0123456789ABCDEF
        |3
        |abc
        |10
        |0123456789ABCDEF""".stripMargin
      )))

      inSessionBytesSub.sendComplete()
      responseSub.request(1)
      responseProbe.expectNext()
      responseProbe.expectNext()

      //current behaviour
      //responseProbe.expectComplete()

      // expected behaviour
      responseSub.request(1)
      val chunk = responseProbe.expectNext()
      assert(chunk.last == EntityStreamError(ErrorInfo("Entity stream truncation")))
    }

  }

}

Also i think it would be safer to drainParser in onUpstreamFinish too, as parser.onPull() can theoretically return valid chunk, while entity stream truncation error would be next:

https://github.com/akka/akka-http/blob/59e5fdb24c6b6457b9b3156abfa652f32a4fd4b1/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala#L322

All 6 comments

@kstokoz thanks for the report. To clarify, are you saying that when the server-side fails and closes the TCP connection either regularly or with an error, you don't see that error on the client side but the entity stream just completes regularly?

I have multiple traces and it looks like when client sends byte string of 31 bytes, tls session is closed.

2018-03-05 14:30:26,075 unwrap: status=OK handshake=NOT_HANDSHAKING remaining=31 out=10732
2018-03-05 14:30:26,075 unwrap: status=CLOSED handshake=NEED_WRAP remaining=0 out=10732
2018-03-05 14:30:26,075 flushToUser
2018-03-05 14:30:26,075 bidirectional continue
2018-03-05 14:30:26,075 wrap: status=CLOSED handshake=NOT_HANDSHAKING remaining=0 out=31
2018-03-05 14:30:26,075 flushToTransport
2018-03-05 14:30:26,075 sending 31 bytes
2018-03-05 14:30:26,075 STOP Outbound Closed: true Inbound closed: true
2018-03-05 14:30:26,075 awaitingClose
2018-03-05 14:30:26,075 chopping from new chunk of 1440 into TransportIn (12120)
2018-03-05 14:30:26,075 unwrap: status=BUFFER_UNDERFLOW handshake=NEED_UNWRAP remaining=13560 out=0
2018-03-05 14:30:26,075 flushToUser
2018-03-05 14:30:26,075 postStop
6864567A onNext ByteString(21, 3, 3, 0, 26, 0, 0, 0, 0, 0, 0, 0, 2, 117, -118, 103, 92, 11, -99, 94, -98, -112, -78, -110, 38, 64, 124, -81, -108, 0, 64) port=akka.stream.impl.fusing.GraphStages$Identity$$anon$1@323eac6

After that stage by stage full graph completes and http entity stream is indeed completes normally w/o LastChunk message being sent:

6814FE4C ASYNC OnNext(EntityChunk(Chunk(ByteString(44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 53, 54, 55, 48, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 67, 53, 57, 55, 55, 49, 48, 51, 67, 67, 69, 57, 54, 52, 50, 52, 70, 70, 49, 69, 48, 52, 55, 66, 55, 53, 68, 67, 51)... and [16333] more,))) (<function1>) [akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e]
6814FE4C ---------------- EXECUTE (8, 260575, 260576)(Connection(6, akka.stream.impl.fusing.Collect$$anon$2@227aad05, akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e, akka.stream.impl.fusing.Collect$$anon$2@227aad05, akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e, 4, EntityChunk(Chunk(ByteString(44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 53, 54, 55, 48, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 67, 53, 57, 55, 55, 49, 48, 51, 67, 67, 69, 57, 54, 52, 50, 52, 70, 70, 49, 69, 48, 52, 55, 66, 55, 53, 68, 67, 51)... and [16333] more,)))) (running=8, shutdown=1,2,2,2,2,2,2,1)
6814FE4C PUSH akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e -> akka.stream.impl.fusing.Collect$$anon$2@227aad05, EntityChunk(Chunk(ByteString(44, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34, 112, 97, 114, 116, 105, 116, 105, 111, 110, 34, 58, 34, 51, 48, 48, 50, 55, 53, 54, 55, 48, 34, 44, 34, 108, 97, 121, 101, 114, 34, 58, 34, 111, 109, 118, 45, 98, 97, 115, 101, 34, 44, 34, 100, 97, 116, 97, 72, 97, 110, 100, 108, 101, 34, 58, 34, 67, 53, 57, 55, 55, 49, 48, 51, 67, 67, 69, 57, 54, 52, 50, 52, 70, 70, 49, 69, 48, 52, 55, 66, 55, 53, 68, 67, 51)... and [16333] more,)) (akka.stream.impl.fusing.Collect$$anon$2@227aad05) [akka.stream.impl.fusing.Collect$$anon$2@227aad05]
6814FE4C PULL akka.stream.impl.fusing.Collect$$anon$2@227aad05 -> akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e (akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e) [akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e]
6814FE4C ASYNC OnComplete (<function1>) [akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e]
6814FE4C   complete(Connection(6, akka.stream.impl.fusing.Collect$$anon$2@227aad05, akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e, akka.stream.impl.fusing.Collect$$anon$2@227aad05, akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e, 8, Empty)) [8]
6814FE4C ---------------- EXECUTE (8, 260577, 260578)(Connection(6, akka.stream.impl.fusing.Collect$$anon$2@227aad05, akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e, akka.stream.impl.fusing.Collect$$anon$2@227aad05, akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e, 40, Empty)) (running=7, shutdown=1,2,2,2,2,2,2,0)
6814FE4C COMPLETE akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e -> akka.stream.impl.fusing.Collect$$anon$2@227aad05 (akka.stream.impl.fusing.Collect$$anon$2@227aad05) [akka.stream.impl.fusing.Collect$$anon$2@227aad05]
6814FE4C   cancel(Connection(6, akka.stream.impl.fusing.Collect$$anon$2@227aad05, akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e, akka.stream.impl.fusing.Collect$$anon$2@227aad05, akka.stream.impl.fusing.SubSource$$anon$4@7ed6f38e, 56, Empty)) [56]

@jrudolph I was running all day today downloads and was comparing akka stream traces to find a culprit of this issue, so far i was able to isolate it to this line:

https://github.com/akka/akka-http/blob/59e5fdb24c6b6457b9b3156abfa652f32a4fd4b1/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala#L322

When i modified onUpstreamFinish to look like:

  override def onUpstreamFinish(): Unit =
    if (waitingForMethod)
      completeStage()
    else {
      if (parser.onUpstreamFinish())
        completeStage()
      else
        parser.onPull() match {
          case EntityChunk(chunk) if !chunk.isLastChunk ⇒
            failStage(EntityStreamException("Entity stream truncation"))
          case EntityStreamError(info) ⇒
            failStage(EntityStreamException(info))
          case elem ⇒
            emit(responseOut, elem :: Nil, () ⇒ completeStage())
        }
    }

I was first time able to consume full response of 40GBs successfully.

In all my traces for failing cases onUpstreamFinish would complete normally emitting entity chunk and there was no check that it is actually last one, i think it would be beneficial to add.
Explicit case statement above for EntityStreamError was still required as otherwise error message EntityStreamError(info) was emitted but never consumed by next stage (i assume it was emitted to inter stage buffers) and entity stream would complete successfully, even though it should not.

I`m not very well familiar with akka http internals and not sure that snippet above is the best solution to address this issue, but it worked for me. Maybe there could be better solution that would ensure that upstream stages consume all pending messages before completing normally and would fail if some conditions not be met, like consumed entity chunk is actually last one.

Thanks, @kstokoz, great analysis.

Explicit case statement above for EntityStreamError was still required as otherwise error message EntityStreamError(info) was emitted but never consumed by next stage (i assume it was emitted to inter stage buffers) and entity stream would complete successfully, even though it should not.

I think we should rather analyze why the EntityStreamError isn't handled properly. I have the suspicion it might be related to the dreaded cancellation / completion race.

I agree it is looks like race condition as it is happening not every time. Some of truncation errors are reported properly, others not.
When i was looking at traces i can see that some stages like BatchingActorInputBoundary are not marked as completed while buffers are non-empty. I would assume stages which are consuming entity stream should do similar work as well.

My feeling says that issue can be in async nature of PrepareResponse and SubSource/SubSink.
If you can give me hints where to search and dig, i can do few more runs to see if they would help to address this issue as well.

As i said reproducer is quite stable, but unfortunately require long time to run until it surface.

@jrudolph I was finally able to get to the bottom of the issue.

What is happening:

  1. drainParser collect parser output and register handler on NeedMoreData to getNextData in ResponseParsingMerge stage

  2. later dataIn is finished (sessionBytes input due to ssl error) and onUpstreamFinish is called, responseOut is not available so EmittingSingle handler is registered.

  3. on next pull getNextData complete the stage immediately as dataIn is closed. Emit single handler registered on responseOut in step2 is actually lost.
    https://github.com/akka/akka-http/blob/59e5fdb24c6b6457b9b3156abfa652f32a4fd4b1/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala#L335-L339

Here is a reproducer:

package akka.http.impl.engine.client

import akka.http.impl.engine.client.OutgoingConnectionBlueprint.ResponseParsingMerge
import akka.http.impl.engine.parsing.HttpResponseParser.ResponseContext
import akka.http.impl.engine.parsing.ParserOutput.EntityStreamError
import akka.http.impl.engine.parsing.{HttpHeaderParser, HttpResponseParser, ParserOutput}
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.ParserSettings
import akka.stream.TLSProtocol.SessionBytes
import akka.stream.javadsl.RunnableGraph
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.stream.testkit.{TestPublisher, TestSubscriber}
import akka.stream.{ActorMaterializer, Attributes, ClosedShape}
import akka.testkit.AkkaSpec
import akka.util.ByteString

class ResponseParsingMergeSpec extends AkkaSpec {

  val parserSettings = ParserSettings(system)

  "The ResponseParsingMerge stage" should {

    "not lose entity truncation messages" in {
      implicit val mat = ActorMaterializer()

      val inBypassProbe = TestPublisher.manualProbe[OutgoingConnectionBlueprint.BypassData]()
      val inSessionBytesProbe = TestPublisher.manualProbe[SessionBytes]()
      val responseProbe = TestSubscriber.manualProbe[List[ParserOutput.ResponseOutput]]

      val responseParsingMerge: ResponseParsingMerge = {
        val rootParser = new HttpResponseParser(parserSettings, HttpHeaderParser(parserSettings, log))
        new ResponseParsingMerge(rootParser)
      }

      RunnableGraph.fromGraph(
        GraphDSL.create(){ implicit b ⇒
          import GraphDSL.Implicits._
          val parsingMerge = b.add(responseParsingMerge)

          Source.fromPublisher(inBypassProbe) ~> parsingMerge.in1
          Source.fromPublisher(inSessionBytesProbe) ~> parsingMerge.in0
          parsingMerge.out ~> Sink.fromSubscriber(responseProbe)

          ClosedShape
        }.withAttributes(Attributes.inputBuffer(1, 1))
      ).run(mat)

      val inSessionBytesSub = inSessionBytesProbe.expectSubscription()
      val inBypassSub = inBypassProbe.expectSubscription()
      val responseSub = responseProbe.expectSubscription()

      responseSub.request(1)
      inSessionBytesSub.expectRequest(1)

      inBypassSub.sendNext(ResponseContext(HttpMethods.GET, None))
      inSessionBytesSub.sendNext(SessionBytes(null, ByteString(
        """HTTP/1.1 200 OK
        |Transfer-Encoding: chunked
        |Connection: lalelu
        |Content-Type: application/pdf
        |Server: spray-can
        |
        |3
        |abc
        |10
        |0123456789ABCDEF
        |3
        |abc
        |10
        |0123456789ABCDEF""".stripMargin
      )))

      inSessionBytesSub.sendNext(SessionBytes(null, ByteString(
        """
        |3
        |abc
        |10
        |0123456789ABCDEF
        |3
        |abc
        |10
        |0123456789ABCDEF""".stripMargin
      )))

      inSessionBytesSub.sendComplete()
      responseSub.request(1)
      responseProbe.expectNext()
      responseProbe.expectNext()

      //current behaviour
      //responseProbe.expectComplete()

      // expected behaviour
      responseSub.request(1)
      val chunk = responseProbe.expectNext()
      assert(chunk.last == EntityStreamError(ErrorInfo("Entity stream truncation")))
    }

  }

}

Also i think it would be safer to drainParser in onUpstreamFinish too, as parser.onPull() can theoretically return valid chunk, while entity stream truncation error would be next:

https://github.com/akka/akka-http/blob/59e5fdb24c6b6457b9b3156abfa652f32a4fd4b1/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala#L322

Was this page helpful?
0 / 5 - 0 ratings