Akka-http: Error using handleWebSocketMessages

Created on 3 Apr 2020  路  5Comments  路  Source: akka/akka-http

I ran into the following error when using handleWebSocketMessages along with Source.queue, in akka version 2.6.4 and akka-http 10.1.11:

23:05:32.564 [test-server-akka.actor.default-dispatcher-7] ERROR akka.actor.ActorSystemImpl - Websocket handler failed with Processor actor [Actor[akka://test-server/system/Materializers/StreamSupervisor-1/flow-0-1-fanoutPublisherSink#622867610]] terminated abruptly
akka.stream.AbruptTerminationException: Processor actor [Actor[akka://test-server/system/Materializers/StreamSupervisor-1/flow-0-1-fanoutPublisherSink#622867610]] terminated abruptly

I created a minimal example to reproduce this issue:

object TestServer extends App {

  private val config = ConfigFactory.parseString(
    """
      |
      | akka.loglevel = "DEBUG"
      |
      |""".stripMargin)


  implicit val system: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "test-server", config)
  implicit val classicSystem: classic.ActorSystem = system.toClassic
  implicit val executionContext: ExecutionContext = system.executionContext
  implicit val materializer: Materializer = Materializer(classicSystem)

  val (queue, source) = Source.queue[Int](50, OverflowStrategy.fail).preMaterialize()

  system.scheduler.scheduleWithFixedDelay(0.seconds, 1.second)(() => queue.offer(1))
  val wsHandler = path("ws") {
    handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, source.map(_ => TextMessage("Hey!"))))
  }

  Http().bindAndHandle(wsHandler, "127.0.0.1", port = 3030)
}

The error does not appear using the previous akka and akka-http versions (akkaVersion = "2.6.3" and akkaHttpVersion = "10.1.10")

For a complete repository (with both server and client): https://github.com/nrainhart/akka-http-ws-handler-with-source-queue

0 - new

Most helpful comment

I figured out what's going on, see https://github.com/akka/akka/issues/28926. The reason is that fanout publishers (and also preMaterialize) by default have a subscription timeout of 5 seconds for the first subscriber. That's also why adding the first subscriber early will fix the issue.

I'm closing this as a "duplicate" of the new akka issue.

All 5 comments

When does the log message show? This kind of message will be shown if there are streams that are still running when the actor system is terminated.

The message appears right after a client connects to the server. The connection between client and server is then closed.

On a side note, adding a consumer for the source (e.g. source.runWith(Sink.ignore)) also prevents this error from occurring.

I couldn't reproduce the exact behavior with the code you shared.

As you observed adding a Sink.ignore to the queue can help. That's because preMaterialize creates a fan-out piece that shuts down after all current consumers have cancelled. A more explicit way of stating that you want to keep the queue alive would be using a BroadcastHub instead:

  val (queue, source0) =
    Source.queue[Int](50, OverflowStrategy.fail)
      .toMat(BroadcastHub.sink)(Keep.both)
      .run()

Also note, that the slowest consumer will eventually backpressure the whole stream (if the slowest consumer is slower than the producer and when all buffers have been filled). To decouple all the clients, you can use a buffer element behind the broadcast like this:

// Adds a buffer that will be materialized per client. When a client cannot keep
// up, it will be dropped. You can also consider other OverflowStrategies,
// though OverflowStrategy.backpressure is not recommended as that will constrain
// throughput to the slowest consumer again
val source = source0.buffer(1000 /* Tune as needed */, OverflowStrategy.fail)

Thank you for your explanation @jrudolph. I suspected that behavior from preMaterialize, but it's good to have a confirmation. I went with the option you suggested to solve the problem in the latest version of akka-http.

If you want to look more into the issue with the different versions, I'm happy to provide more information. If not, feel free to close the issue.

I figured out what's going on, see https://github.com/akka/akka/issues/28926. The reason is that fanout publishers (and also preMaterialize) by default have a subscription timeout of 5 seconds for the first subscriber. That's also why adding the first subscriber early will fix the issue.

I'm closing this as a "duplicate" of the new akka issue.

Was this page helpful?
0 / 5 - 0 ratings