Describe the bug
Often Multi returned to Resteasy causes exception, although both database and Kafka are executed in a worker thread.
Code:
@POST
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Produces(MediaType.APPLICATION_JSON)
@RolesAllowed("file-upload")
public Uni<List<Long>> upload(
@Nonnull @Context SecurityContext securityContext,
Map<String, byte[]> files
) {
return Multi
.createFrom().iterable(files.entrySet())
.onItem().produceMulti(entry -> extractFiles(securityContext.getUserPrincipal().getName(), entry))
.concatenate()
.onItem().invoke(fileEntity -> persist(fileEntity)).subscribeOn(Infrastructure.getDefaultWorkerPool())
.emitOn(Infrastructure.getDefaultWorkerPool())
.onItem().produceCompletionStage(fileEntity -> sendMessage(fileEntity)).concatenate()
.collectItems().asList()
.ifNoItem().after(Duration.ofSeconds(200)).failWith(
() -> new ServerErrorException("Kafka not responding", Response.Status.GATEWAY_TIMEOUT));
}
sendMessage returns CompletableFuture:
final CompletableFuture<Void> voidCompletableFuture = new CompletableFuture();
OutgoingKafkaRecord<Long, ProcessingStatus> message =
KafkaRecord.of(fileEntity.getId(), status)
.withAck(() -> {
voidCompletableFuture.complete(null);
return voidCompletableFuture;
});
emitter.send(message);
return voidCompletableFuture.thenApply(void -> status.getId());
Exception:
[vert.x-eventloop-thread-2] [SynchronousDispatcher.java:545] - RESTEASY002020: Unhandled asynchronous exception, sending back 500: javax.ws.rs.ProcessingException: RESTEASY008205: JSON Binding serialization error java.lang.IllegalStateException: UT000126: Attempted to do blocking IO from the IO thread. This is prohibited as it may result in deadlocks
It seems that error does not happen when emmiter.send is used without the future.
Expected behavior
Resteasy should just execute the request as all blocking operations are moved to worker threads.
Actual behavior
Resteasy complaints about blocking operation.
To Reproduce
Steps to reproduce the behavior:
Configuration
# Add your application.properties here, if applicable.
Screenshots
(If applicable, add screenshots to help explain your problem.)
Environment (please complete the following information):
uname -a or ver: Microsoft Windows [Version 10.0.19041.153]java -version: 1.8mvnw --version or gradlew --version): Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)Additional context
@cescoffier https://groups.google.com/forum/#!topic/smallrye/J4fEeQYfM5w
/cc @cescoffier @stuartwdouglas @FroMage
First, I would simplify the code:
@POST
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Produces(MediaType.APPLICATION_JSON)
@RolesAllowed("file-upload")
public Uni<List<Long>> upload(
@Nonnull @Context SecurityContext securityContext,
Map<String, byte[]> files
) {
return Multi
.createFrom().iterable(files.entrySet())
.emitOn(Infrastructure.getDefaultWorkerPool())
.onItem().produceMulti(entry -> extractFiles(securityContext.getUserPrincipal().getName(), entry))
.concatenate()
.onItem().invoke(fileEntity -> persist(fileEntity))
.onItem().produceCompletionStage(fileEntity -> sendMessage(fileEntity)).concatenate()
.emitOn(Infrastructure.getDefaultWorkerPool()) // The ack is on the event loop.
.collectItems().asList()
.ifNoItem().after(Duration.ofSeconds(200)).failWith(
() -> new ServerErrorException("Kafka not responding", Response.Status.GATEWAY_TIMEOUT));
}
If we do blocking IO it means we must have a filter that disabled async IO. But I was pretty sure I would detect and reject it. Is this with quarkus-undertow or without?
Can we get the full stack trace?
Sorry, I forgot to attach it. Here it is.
First, I would simplify the code:
@cescoffier onItem instead of subscribeOn? Does it matter where I put it because in your example it is not immediately before persist?
If we do blocking IO it means we must have a filter that disabled async IO. But I was pretty sure I would detect and reject it. Is this with
quarkus-undertowor without?
I have this dependency for WebSockets; it seems that is used for REST, too, although I do not need it.
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-undertow-websockets</artifactId>
</dependency>
onItem is about receiving items.
subscribeOn changes the thread used during the subscription.
I removed a few subscribeOn and emitOn.
And yes, location matters. emitOn constraints the thread used to dispatch the event downstream (next line).
Yeah, i see the gzip interceptor which forces blocking IO.
onItemis about receiving items.
subscribeOnchanges the thread used during the subscription.I removed a few subscribeOn and emitOn.
And yes, location matters.
emitOnconstraints the thread used to dispatch the event downstream (next line).
Isn't it needed for "persist" (database operation)? "extractFiles" just unzips from byte[] so it is probably not needed there.
Yeah, i see the gzip interceptor which forces blocking IO.
I removed @GZIP, but is still happens (without gzip part in stack trace).
Can you show me the new stack trace then?
I already changed it because of the issues, but I will try to reproduce it again, maybe later today.
@FroMage here is the log.
log.txt
I still see GZIP:
at org.jboss.resteasy.plugins.interceptors.GZIPEncodingInterceptor.aroundWriteTo(GZIPEncodingInterceptor.java:103)
You are right, it disappeared just from the cause "java.lang.IllegalStateException" (java.util.zip.GZIPOutputStream).
I am experiencing the same issue when I define in application.properties following : quarkus.resteasy.gzip.enabled=true
quarkus.resteasy.gzip.max-input=10M . The stack trace is attached:
stacktrace.txt
My question is shouldn't the request be handled by a Worker Thread when using JaxRS ? Or is a Mutiny JaxRS Route being handled on a IO Thread like Reactive Routes ?
Jax-RS method returning Mutiny types are still called on the worker thread pool. However, depending on what you do in the method, you may get back to an event loop (typically if you use a reactive client).
@stuartwdouglas can you remind me what you told me about being able to run gzip in an async fashion?
@cescoffier Ok then, correct me if I am wrong, I have to use an emitOn to go back to a worker thread. Which executor do I have to use to get back to a JaxRS worker thread?
You can use java.util.zip.Deflater and java.util.zip.Inflater.
@akoufa yiu can use ‘Infrastructure.getDefaultExecutor()’
@FroMage @stuartwdouglas Can't Quarkus Gzip support via enabling it in application properties be changed to be async instead of blocking ?
Shouldn't quarkus make sure that gzip is run on the correct thread? I don't think this should be something user's need to know about.
@stuartwdouglas thanks.
Shouldn't quarkus make sure that gzip is run on the correct thread? I don't think this should be something user's need to know about.
Well yeah, if we can make gzip work with async IO, it will be automatic.
Maybe I don't get all the stuff that comes together here. But as far as I understood, it would be fixable by changing the thread to the jaxrs worker thread again, am I right? If so, couldn't quarkus switch the thread of the Uni/Multi back to the jaxrs thread before doing any more jaxrs/response handling/gzip stuff with it?
Hi, Any update on this please? I also got the same issue. Is it possible to enhance quarkus to use gzip for aysnc also?
No update so far, I haven't had time to look at it. If you want to contribute I could help you?