Further investigation of #2551 shows that CompletableFutures behave differently in a Spring application.
The following code using CompletableFutures stops before the thenRun method is executed:
CompletableFuture<Void> f = CompletableFuture.runAsync(() -> {
for (int i = 0; i < 10; i++) {
try {
System.out.println(i);
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
}).thenRun(() -> {
for (int i = 0; i < 10; i++) {
try {
System.out.println(i);
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
});
Thread.sleep(500);
f.cancel(true);
try{
f.get();
}
catch(CancellationException | ExecutionException e){
System.out.println("future cancelled");
}
System.out.println("end");
Output:
1
2
future cancelled
end
3
4
5
6
7
8
9
However, when using the vavr future implementation, the second part is executed as well:
Future<Void> f = Future.run(() -> {
int i = 1;
while(i < 10){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(i++);
}
}).andThen(v -> {
int i = 1;
while(i < 10){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(i++);
}
});
Thread.sleep(500);
f.cancel();
try{
f.get();
}
catch(CancellationException e){
System.out.println("future cancelled");
}
System.out.println("end");
Output:
1
2
future cancelled
end
3
4
5
6
7
8
9
1
2
3
4
5
6
7
8
9
I did run your example and ended up with a correct result:
1
2
future cancelled
end
We might be dealing with a subtle concurrency issue. Will investigate in upcoming days.
Which version are you using?
We are using version 0.10.2.
Did you run the example in a Spring application. Using a plain Java application, I also get the correct result.
Thanks for reporting!
It seems odd that a Spring application behaves differently. Does it change the default Java ExecutorService at runtime? I will check that...
Spring doesn't seem to be an issue here.
You can reproduce the issue by adding Thread.sleep so that main thread doesn't terminate and get the same result. As shown below.
```public static void main(String[] args) throws InterruptedException {
Future
int i = 1;
while(i < 10){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(i++);
}
}).andThen(v -> {
int i = 1;
while(i < 10){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(i++);
}
});
Thread.sleep(500);
f.cancel();
try{
f.get();
}
catch(CancellationException e){
System.out.println("future cancelled");
}
System.out.println("end");
Thread.sleep(10000);
}
```
I did further research. Please refer to updated code fragment.
Future<Void> f1 = Future.run(() -> {
int i = 1;
while (i < 10) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println("f1 : InterruptedException");
throw new RuntimeException(e);
}
System.out.println("f1 : " + i++);
}
});
Future<Void> f2 = f1.andThen(
v -> v.onSuccess(aVoid -> {
System.out.println("Value recieved from f1 : , continuing with task" + v.toString());
int i = 1;
while (i < 10) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println("f2 : " + i++);
}
}).onFailure(throwable -> {
System.out.println("Failure recieved from f1 : " + throwable.toString());
}));
Thread.sleep(4000);
boolean ans = f2.cancel();
try {
f2.get();
} catch (CancellationException e) {
System.out.println("future cancelled");
}
System.out.println("end");
Thread.sleep(1000000);
Here we have two future instance, f1 is the original and f2 is latter.
If we call cancel on f2, it is same example as the OPs. The output for this.
f1 : 1
f1 : 2
future cancelled
end
f1 : 3
f1 : 4
f1 : 5
f1 : 6
f1 : 7
f1 : 8
f1 : 9
Value recieved from f1 : , continuing with taskSuccess(null)
f2 : 1
f2 : 2
f2 : 3
f2 : 4
f2 : 5
f2 : 6
f2 : 7
f2 : 8
f2 : 9
But if we change f2.cancel() to f1.cancel() it works as intended. output in that case is
f1 : 1
f1 : 2
f1 : InterruptedException
Failure received from f1 : java.util.concurrent.CancellationException
future canceled
end
Thus, future received after calling andThen ( f2 ) if we call cancel() on that, it doesn't affect f1. Now since f1 runs smoothly, it passes success value to f2 and it also executes the task in a proper manner.
To make reference to my previous comment. f1 is Future of the original async task.
The above code returns f2 which is Future of an async task that adds the action to be performed after f1 is finished.
andThen call should actually return f1.
That will fix the issue.
If we do want to return f2 then, we change code like this
default Future<T> andThen(Consumer<? super Try<T>> action) {
Objects.requireNonNull(action, "action is null");
return Future.<T>run(executor(), complete ->
onComplete(t -> {
Try.run(() -> action.accept(t));
complete.with(t);
})
).onFailure(throwable -> {
if (throwable instanceof CancellationException)
cancel();
});
}
This code adds callback to f2 which makes sure if f2 is cancelled then that is passed to cancel of f1.
@charvakcpatel007 That looks great! Are you volunteering for a PR? 馃槉
@danieldietrich I wanted to ask which approach makes more sense.
1) Returning Original Future or
2) Adding a callback and keeping the return of child future as it is.
@charvakcpatel007 I think there might be a better implementation. Could you please try the following:
default Future<T> andThen(Consumer<? super Try<T>> action) {
Objects.requireNonNull(action, "action is null");
return run(executor(), complete ->
onComplete(t -> {
if (!isCancelled()) {
Try.run(() -> action.accept(t));
}
complete.with(t);
})
);
}
Thx!
Hi,
Right now, the action provided in andThen argument will get executed albeit if the future is canceled value inside Try will be java.util.concurrent.CancellationException. The behavior seems to be correct. because if the original future is canceled, the actions in the andThen should receive Try with failure exception.
Your suggestion will change that.
Though the bug is different.
If you look at the code snippet above. There are two futures
f1 -> represents the return value of the original task
f2 -> represents the return value of the task that adds action in the f1. It doesn't represent the action that was submitted as the arg of andThen. (the difference is, one is Future of adding the action, and other is the action itself )
There can be multiple ways to solve this
1) Make sure that when f2 is canceled, f1 is also canceled.
default Future<T> andThen(Consumer<? super Try<T>> action) {
Objects.requireNonNull(action, "action is null");
return Future.<T>run(executor(), complete ->
onComplete(t -> {
Try.run(() -> action.accept(t));
complete.with(t);
})
).onFailure(throwable -> {
if (throwable instanceof CancellationException)
cancel();
});
}
2) Return the original future rather then the future that represents adding the action. So no need to propagate anything, since now the return is also f1.
default Future<T> andThen(Consumer<? super Try<T>> action) {
Objects.requireNonNull(action, "action is null");
run(executor(), complete ->
onComplete(t -> {
Try.run(() -> action.accept(t));
complete.with(t);
})
);
return this;
}
3) Make the adding of action as a sync operation. So no f2 in this case.
default Future<T> andThen(Consumer<? super Try<T>> action) {
onComplete(t -> {
Try.run(() -> action.accept(t));
});
return this;
}
Please let me know your suggestions @danieldietrich
Thanks for your analysis, @charvakcpatel007!
We align to Scala, so I double-checked Scala's Future.andThen implementation:

Our current andThen implementation is close to that of Scala's Future.
andThen executes the side-effecting action, 'ignores' a possible exception of that action and returns a new Futureinstance with the original result.
The problem is that our Future implementation completes with a Failure(CancellationException) if the Future is cancelled.
Cancellation means that further processing should be stopped. I think the correct solution is to _fix_ FutureImpl the way that on cancellation no callback is called. Namely onComplete, onSuccess and onFailure must not be called. That way subsequent Futures aren't executed.
@danieldietrich Added a PR. Seems like this much should suffice. If it seems good then I can add tests covering this scenario.