Vavr: Chained futures keep executing although future was cancelled

Created on 10 Jan 2020  路  12Comments  路  Source: vavr-io/vavr

Further investigation of #2551 shows that CompletableFutures behave differently in a Spring application.

The following code using CompletableFutures stops before the thenRun method is executed:

CompletableFuture<Void> f = CompletableFuture.runAsync(() -> {
    for (int i = 0; i < 10; i++) {
        try {
            System.out.println(i);
            Thread.sleep(200);
        } catch (InterruptedException e) {
        }
    }
}).thenRun(() -> {
    for (int i = 0; i < 10; i++) {
        try {
            System.out.println(i);
            Thread.sleep(200);
        } catch (InterruptedException e) {
        }
    }
});
Thread.sleep(500);

f.cancel(true);
try{
    f.get();
}
catch(CancellationException | ExecutionException e){
    System.out.println("future cancelled");
}
System.out.println("end");

Output:

1
2
future cancelled
end
3
4
5
6
7
8
9

However, when using the vavr future implementation, the second part is executed as well:

Future<Void> f = Future.run(() -> {
    int i = 1;
    while(i < 10){
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
        }
        System.out.println(i++);
    }
}).andThen(v -> {
    int i = 1;
    while(i < 10){
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
        }
        System.out.println(i++);
    }
});
Thread.sleep(500);
f.cancel();
try{
    f.get();
}
catch(CancellationException e){
    System.out.println("future cancelled");
}

System.out.println("end");

Output:

1
2
future cancelled
end
3
4
5
6
7
8
9
1
2
3
4
5
6
7
8
9
bug help wanted 芦vavr-concurrent禄

All 12 comments

I did run your example and ended up with a correct result:

1
2
future cancelled
end

We might be dealing with a subtle concurrency issue. Will investigate in upcoming days.
Which version are you using?

We are using version 0.10.2.

Did you run the example in a Spring application. Using a plain Java application, I also get the correct result.

Thanks for reporting!
It seems odd that a Spring application behaves differently. Does it change the default Java ExecutorService at runtime? I will check that...

Spring doesn't seem to be an issue here.
You can reproduce the issue by adding Thread.sleep so that main thread doesn't terminate and get the same result. As shown below.
```public static void main(String[] args) throws InterruptedException {
Future f = Future.run(() -> {
int i = 1;
while(i < 10){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(i++);
}
}).andThen(v -> {
int i = 1;
while(i < 10){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(i++);
}
});
Thread.sleep(500);
f.cancel();
try{
f.get();
}
catch(CancellationException e){
System.out.println("future cancelled");
}

    System.out.println("end");
    Thread.sleep(10000);

}
```

I did further research. Please refer to updated code fragment.

Future<Void> f1 = Future.run(() -> {
            int i = 1;
            while (i < 10) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    System.out.println("f1 : InterruptedException");
                    throw new RuntimeException(e);
                }
                System.out.println("f1 : " + i++);
            }
        });
        Future<Void> f2 = f1.andThen(
                v -> v.onSuccess(aVoid -> {
                    System.out.println("Value recieved from f1 :  , continuing with task" + v.toString());
                    int i = 1;
                    while (i < 10) {
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                        }
                        System.out.println("f2 : " + i++);
                    }
                }).onFailure(throwable -> {
                    System.out.println("Failure recieved from f1 : " + throwable.toString());
                }));
        Thread.sleep(4000);
        boolean ans = f2.cancel();
        try {
            f2.get();
        } catch (CancellationException e) {
            System.out.println("future cancelled");
        }
        System.out.println("end");
        Thread.sleep(1000000);

Here we have two future instance, f1 is the original and f2 is latter.
If we call cancel on f2, it is same example as the OPs. The output for this.

f1 : 1
f1 : 2
future cancelled
end
f1 : 3
f1 : 4
f1 : 5
f1 : 6
f1 : 7
f1 : 8
f1 : 9
Value recieved from f1 :  , continuing with taskSuccess(null)
f2 : 1
f2 : 2
f2 : 3
f2 : 4
f2 : 5
f2 : 6
f2 : 7
f2 : 8
f2 : 9

But if we change f2.cancel() to f1.cancel() it works as intended. output in that case is

f1 : 1
f1 : 2
f1 : InterruptedException
Failure received from f1 : java.util.concurrent.CancellationException
future canceled
end

Thus, future received after calling andThen ( f2 ) if we call cancel() on that, it doesn't affect f1. Now since f1 runs smoothly, it passes success value to f2 and it also executes the task in a proper manner.

https://github.com/vavr-io/vavr/blob/0b7a72c8c801999538dda1e3413a6063aadab934/src/main/java/io/vavr/concurrent/Future.java#L793-L801

To make reference to my previous comment. f1 is Future of the original async task.
The above code returns f2 which is Future of an async task that adds the action to be performed after f1 is finished.
andThen call should actually return f1.
That will fix the issue.

If we do want to return f2 then, we change code like this

default Future<T> andThen(Consumer<? super Try<T>> action) {
        Objects.requireNonNull(action, "action is null");
        return Future.<T>run(executor(), complete ->
                onComplete(t -> {
                    Try.run(() -> action.accept(t));
                    complete.with(t);
                })
        ).onFailure(throwable -> {
            if (throwable instanceof CancellationException)
                cancel();
        });
    }

This code adds callback to f2 which makes sure if f2 is cancelled then that is passed to cancel of f1.

@charvakcpatel007 That looks great! Are you volunteering for a PR? 馃槉

@danieldietrich I wanted to ask which approach makes more sense.
1) Returning Original Future or
2) Adding a callback and keeping the return of child future as it is.

@charvakcpatel007 I think there might be a better implementation. Could you please try the following:

default Future<T> andThen(Consumer<? super Try<T>> action) {
    Objects.requireNonNull(action, "action is null");
    return run(executor(), complete ->
            onComplete(t -> {
                if (!isCancelled()) {
                    Try.run(() -> action.accept(t));
                }
                complete.with(t);
            })
    );
}

Thx!

Hi,
Right now, the action provided in andThen argument will get executed albeit if the future is canceled value inside Try will be java.util.concurrent.CancellationException. The behavior seems to be correct. because if the original future is canceled, the actions in the andThen should receive Try with failure exception.
Your suggestion will change that.

Though the bug is different.

If you look at the code snippet above. There are two futures
f1 -> represents the return value of the original task
f2 -> represents the return value of the task that adds action in the f1. It doesn't represent the action that was submitted as the arg of andThen. (the difference is, one is Future of adding the action, and other is the action itself )

There can be multiple ways to solve this
1) Make sure that when f2 is canceled, f1 is also canceled.

default Future<T> andThen(Consumer<? super Try<T>> action) {
        Objects.requireNonNull(action, "action is null");
        return Future.<T>run(executor(), complete ->
                onComplete(t -> {
                    Try.run(() -> action.accept(t));
                    complete.with(t);
                })
        ).onFailure(throwable -> {
            if (throwable instanceof CancellationException)
                cancel();
        });
    }

2) Return the original future rather then the future that represents adding the action. So no need to propagate anything, since now the return is also f1.

default Future<T> andThen(Consumer<? super Try<T>> action) {
        Objects.requireNonNull(action, "action is null");
        run(executor(), complete ->
                onComplete(t -> {
                    Try.run(() -> action.accept(t));
                    complete.with(t);
                })
        );
        return this;
    }

3) Make the adding of action as a sync operation. So no f2 in this case.

default Future<T> andThen(Consumer<? super Try<T>> action) {
        onComplete(t -> {
            Try.run(() -> action.accept(t));
        });
        return this;
    }

Please let me know your suggestions @danieldietrich

Thanks for your analysis, @charvakcpatel007!

We align to Scala, so I double-checked Scala's Future.andThen implementation:

Screenshot 2020-07-06 at 23 56 13

Our current andThen implementation is close to that of Scala's Future.

andThen executes the side-effecting action, 'ignores' a possible exception of that action and returns a new Futureinstance with the original result.

The problem is that our Future implementation completes with a Failure(CancellationException) if the Future is cancelled.

Cancellation means that further processing should be stopped. I think the correct solution is to _fix_ FutureImpl the way that on cancellation no callback is called. Namely onComplete, onSuccess and onFailure must not be called. That way subsequent Futures aren't executed.

@danieldietrich Added a PR. Seems like this much should suffice. If it seems good then I can add tests covering this scenario.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

skestle picture skestle  路  3Comments

civitz picture civitz  路  6Comments

santiagopoli picture santiagopoli  路  6Comments

noorulhaq picture noorulhaq  路  5Comments

liviamoroianu picture liviamoroianu  路  3Comments