Akka: Reconnect Websocket

Created on 27 Sep 2016  路  3Comments  路  Source: akka/akka

val sink: Sink[Message, Future[Done]] = Sink.foreach(s => ())
val bufferSize = 100
val source = Source.actorRef[Message](bufferSize, OverflowStrategy.dropHead)
val flow = Flow.fromSinkAndSourceMat(sink, source)(Keep.right)
val (upgradeResponse, actor) =
  Http().singleWebSocketRequest(
    WebSocketRequest(host + "/console"),
    flow
  )

I would like to try to reconnect the websocket when the connection closes. I can't find a way to this by fiddling around and/or reading the documentation.
Did I miss something in the docs?

http

Most helpful comment

If anyone ever wonders the full example is below:

    def runWebSocketConnection(flow: Flow[Message, Message, Future[Done]]): Unit = {
      Http().singleWebSocketRequest(
        new WebSocketRequest(Uri("ws://some.address.here")),
        Flow[Message].alsoTo(Sink.onComplete(_ => runWebSocketConnection(flow))).via(flow))
    }

    runWebSocketConnection(subscriptionFlow)

    Thread.sleep(1000000000000000L)

The function is not exactly recursive, as it completes (hence the need for sleep), and the second execution is delayed on the onComplete handler, so will not overflow the stack.

All 3 comments

Please ask questions on https://groups.google.com/forum/#!forum/akka-user or gitter.im/akka/akka
Issue tracker is reserved for issues.

you can add an flow.alsoTo(Sink.onComplete( ... => reconnect())) where def reconnect is the establishing of the websocket connection.

We also have various retry facilities, here https://github.com/akka/akka-stream-contrib/blob/master/contrib/src/main/scala/akka/stream/contrib/Retry.scala

Depends what exact semantics you want. Let's keep questions to mailing list and chat though (and Akka HTTP tickets to http://github.comakka/akka-http ) :)
You're welcome to ask over there!

Here is a link to the Akka User question that spawned from this, for reference.

@ktoso Feel free to look at the question. We couldn't get flow.alsoTo(Sink.onComplete(...)) to work.

If anyone ever wonders the full example is below:

    def runWebSocketConnection(flow: Flow[Message, Message, Future[Done]]): Unit = {
      Http().singleWebSocketRequest(
        new WebSocketRequest(Uri("ws://some.address.here")),
        Flow[Message].alsoTo(Sink.onComplete(_ => runWebSocketConnection(flow))).via(flow))
    }

    runWebSocketConnection(subscriptionFlow)

    Thread.sleep(1000000000000000L)

The function is not exactly recursive, as it completes (hence the need for sleep), and the second execution is delayed on the onComplete handler, so will not overflow the stack.

Was this page helpful?
0 / 5 - 0 ratings