Akka: (Java) Infinite Loop in PoolRouterBuilder.withBroadcastPredicate

Created on 25 Feb 2021  路  3Comments  路  Source: akka/akka

Using (from Java) com.typesafe.akka:akka-actor-typed_2.13:2.6.12 I observe the following behavior:

Using the Worker from the Routers documentation, trying to use withBroadcastPredicatefor a PoolRouterleads to it being stuck in the infinite while-loop in PoolRouterBuilder.withBroadcastPredicate, no matter what the predicate is:

  • msg -> msg instanceof <SomeType> // infinite loop
  • msg -> true // infinite loop
  • msg -> false // infinite loop

Debugging reveals that the loop will not terminate - which is not that surprising, because it contains no actual return. This Java code can be used to reproduce the problem:

public class PoolRouterDemo
{
   public static void main(final String[] args)
   {
      final Behavior<Worker.Command> guardian = Behaviors.setup(PoolRouterDemo::createPoolRouter);

      final ActorSystem system = ActorSystem.create(guardian, "poolRouterSystem");
   }

   private static Behavior<Worker.Command> createPoolRouter(final ActorContext<Worker.Command> context)
   {
      // A simple Worker behavior
      final Behavior<Worker.Command> workerBehavior = Worker.create();

      // Make sure the router restart on failure
      final Behavior<Worker.Command> supervisedBehavior = Behaviors.supervise(workerBehavior).onFailure(SupervisorStrategy.restart());

      // Create a pool router with the given Dispatcher
      final PoolRouter<Worker.Command> pool = Routers.pool(4, supervisedBehavior)
            .withRouteeProps(DispatcherSelector.sameAsParent())
            .withRoundRobinRouting()
            .withBroadcastPredicate(msg -> false); // Cannot do that, mysteriously freezes the app

      // Code will NOT reach here, stuck in an infinite loop in the last method

      // Spawn a new actor with this pool
      final ActorRef<Worker.Command> router = context.spawn(pool, "worker-pool");

      // Behavior of the user guardian: Relay all messages to router
      return Behaviors.receive(Worker.Command.class)
            .onAnyMessage(msg -> {
               router.tell(msg);
               return Behaviors.same();
            })
            .build();
   }
}

public class Worker
{
   interface Command
   {
   }

   static class DoLog implements Command
   {
      public final String text;

      public DoLog(final String text)
      {
         this.text = text;
      }
   }

   static final Behavior<Command> create()
   {
      return Behaviors.setup(
            context -> {
               context.getLog().info("Starting worker");

               return Behaviors.receive(Command.class)
                     .onMessage(DoLog.class, doLog -> onDoLog(context, doLog))
                     .build();
            });
   }

   private static Behavior<Command> onDoLog(final ActorContext<Command> context, final DoLog doLog)
   {
      context.getLog().info("({}) Got message {}", context.getSelf().path(), doLog.text);
      return Behaviors.same();
   }
}
1 - triaged bug typed

All 3 comments

Ouch, thanks for reporting.

I have some time now and will have a look and send a fix.

Bug is here: https://github.com/akka/akka/blob/b8c79f86958399b7f68104a21689488162b5e04c/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala#L56

We are recursively calling withBroadcastPredicate.

We only have compile time tests for Java, so we never run that code.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

synox picture synox  路  3Comments

patriknw picture patriknw  路  3Comments

armanbilge picture armanbilge  路  3Comments

Daxten picture Daxten  路  3Comments

johanandren picture johanandren  路  3Comments