I have the same situation as described in #2485. Using version 0.26.0.
A listener for failed state is logging an error message when stopping an async subscriber:
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@779f4acb rejected from java.util.concurrent.ScheduledThreadPoolExecutor@7fb36e19[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 41]
Although this error is described as harmless in the referenced issue, it pollutes our logs during shutdown and deployments. The only seam I can find is detect if the from state is STOPPING, and ignore any failure. But I feel like this would potentially miss real errors if they were to occur during shutdown.
Are there any suggested ways to better handle this?
@rcoy-v Would you be able to share a little more of the stack trace with us? It would be really helpful to see what's trying to execute.
Stack trace:
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@7d9f7028 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@35e8a713[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 32]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
at com.google.common.util.concurrent.ListenerCallQueue.execute(ListenerCallQueue.java:87)
at com.google.common.util.concurrent.AbstractService.executeListeners(AbstractService.java:479)
at com.google.common.util.concurrent.AbstractService.stopAsync(AbstractService.java:256)
at com.google.api.core.AbstractApiService.stopAsync(AbstractApiService.java:130)
at com.google.cloud.pubsub.v1.Subscriber.stopConnections(Subscriber.java:471)
at com.google.cloud.pubsub.v1.Subscriber.stopAllStreamingConnections(Subscriber.java:447)
at com.google.cloud.pubsub.v1.Subscriber.doStop(Subscriber.java:293)
at com.google.api.core.AbstractApiService$InnerService.doStop(AbstractApiService.java:154)
at com.google.common.util.concurrent.AbstractService.stopAsync(AbstractService.java:242)
at com.google.api.core.AbstractApiService.stopAsync(AbstractApiService.java:130)
at com.myapp.Application.shutdown(Application.kt:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:369)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeDestroyMethods(InitDestroyAnnotationBeanPostProcessor.java:327)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeDestruction(InitDestroyAnnotationBeanPostProcessor.java:155)
at org.springframework.beans.factory.support.DisposableBeanAdapter.destroy(DisposableBeanAdapter.java:240)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroyBean(DefaultSingletonBeanRegistry.java:576)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingleton(DefaultSingletonBeanRegistry.java:552)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingleton(DefaultListableBeanFactory.java:953)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingletons(DefaultSingletonBeanRegistry.java:521)
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.destroySingletons(FactoryBeanRegistrySupport.java:227)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingletons(DefaultListableBeanFactory.java:960)
at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1035)
at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1011)
at org.springframework.context.support.AbstractApplicationContext$1.run(AbstractApplicationContext.java:933)
The stack trace is not coming from the subscriber, but a listener implementing failed.
We are running this with Spring Boot. The parts related to Google pubsub are still pretty basic, following close to the async example here.
Testing this further, the error is logged if the app is stopped from SIGINT/SIGTERM. If I programmatically stop the subscriber after a sleep of 5 seconds, no error is logged. To be clear, subscriber.stopAsync is getting called in both situations. I am still investigating if this is an issue with our Spring boot app not properly handling shutdown, or the Google client library.
I'm having the same problem with the latest version of pubsub 0.30.0-beta.. Here is my complete stacktract (don't mind the clojure code..)
I can reproduce at any time by running a kill -2 <pid> on the process (which is what happens for gracefully shutting down docker containers).
If you need any more info please let me know.
java.lang.IllegalStateException: Expected the service InnerService [FAILED] to be TERMINATED, but the service has FAILED
at com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:328)
at com.google.common.util.concurrent.AbstractService.awaitTerminated(AbstractService.java:293)
at com.google.api.core.AbstractApiService.awaitTerminated(AbstractApiService.java:104)
at com.oscaro.clj_gcloud.sub$shutdown_BANG_.invokeStatic(sub.clj:127)
at com.oscaro.clj_gcloud.sub$shutdown_BANG_.invoke(sub.clj:123)
at punchcard.components.pubsub$eval1042$fn__1044$fn__1046.invoke(pubsub.clj:77)
at punchcard.components.pubsub$eval1042$fn__1044.invoke(pubsub.clj:76)
at clojure.lang.MultiFn.invoke(MultiFn.java:233)
at integrant.core$try_run_action.invokeStatic(core.cljc:176)
at integrant.core$try_run_action.invoke(core.cljc:174)
at integrant.core$run_loop.invokeStatic(core.cljc:184)
at integrant.core$run_loop.invoke(core.cljc:180)
at integrant.core$reverse_run_BANG_.invokeStatic(core.cljc:201)
at integrant.core$reverse_run_BANG_.invoke(core.cljc:195)
at integrant.core$halt_BANG_.invokeStatic(core.cljc:325)
at integrant.core$halt_BANG_.invoke(core.cljc:319)
at integrant.core$halt_BANG_.invokeStatic(core.cljc:322)
at integrant.core$halt_BANG_.invoke(core.cljc:319)
at punchcard.core$shutdown.invokeStatic(core.clj:33)
at punchcard.core$shutdown.invoke(core.clj:30)
at punchcard.core$_main$fn__28729.invoke(core.clj:56)
at punchcard.core.proxy$java.lang.Thread$ff19274a.run(Unknown Source)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@78d98c1e rejected from java.util.concurrent.ScheduledThreadPoolExecutor@4d743a6b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 4]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
at com.google.common.util.concurrent.ListenerCallQueue.execute(ListenerCallQueue.java:87)
at com.google.common.util.concurrent.AbstractService.executeListeners(AbstractService.java:479)
at com.google.common.util.concurrent.AbstractService.stopAsync(AbstractService.java:256)
at com.google.api.core.AbstractApiService.stopAsync(AbstractApiService.java:129)
at com.google.cloud.pubsub.v1.Subscriber.stopConnections(Subscriber.java:506)
at com.google.cloud.pubsub.v1.Subscriber.stopAllStreamingConnections(Subscriber.java:482)
at com.google.cloud.pubsub.v1.Subscriber.doStop(Subscriber.java:319)
at com.google.api.core.AbstractApiService$InnerService.doStop(AbstractApiService.java:153)
at com.google.common.util.concurrent.AbstractService.stopAsync(AbstractService.java:242)
at com.google.api.core.AbstractApiService.stopAsync(AbstractApiService.java:129)
... 19 common frames omitted
@neuromantik33 This is rather surprising. In Subscriber, we shut down all connections before shutting down the executor so all callbacks should have been scheduled before the executor shuts down. (You have to option to provide your own executor, in this case Subscriber doesn't shut it down.)
It is possible that you're providing an executor and shutting it down before calling stopAsync?
@pongad As far as I know I'm not providing any executor save the one required by the com.google.api.core.ApiService$Listener which is actually not a real one, and definetly not shutting it down beforehand. Here is a snippet of my Clojure code, which I think illustrates somewhat my Subscriber factory. I've also included my shutdown function which is invoked in a shutdown hook of my application. If you want for me to reproduce a Java based project which reproduces this bug I'll try to find the time but I'm not doing anything really exotic that isn't in the docs. I've also been using the pubsub emulator which exibits the same behaviour (hence the setChannelProvider).
(def ^:private direct-executor (MoreExecutors/directExecutor))
(defn- create-subscriber!
"Returns a new subscriber using the default configuration."
^Subscriber
[{:keys [project-id credentials channel-provider]} sub receiver]
(let [sub-name (make-sub-name project-id sub)
builder (cond-> (Subscriber/defaultBuilder sub-name receiver)
credentials (.setCredentialsProvider (c/fixed-credentials credentials))
channel-provider (.setChannelProvider channel-provider))]
(log/info "Creating subscriber:" (->clj sub-name))
(doto (.build builder)
(.addListener (proxy [ApiService$Listener] []
(failed [from ex]
(log/error ex "An unexpected error occurred")))
direct-executor))))
(defn shutdown!
"Shuts down the subscriber and waits for the latter to reach the terminated state."
[^Subscriber subscriber]
(log/info "Shutting down:" (->clj subscriber))
(-> subscriber .stopAsync .awaitTerminated))
I found the problem. I'll put the diagnosis in the PR description.
@pongad Is there a timeframe when this fix will be available? 1.16.0 of gax-java is still not used in a released version of pubsub, unless I'm mistaken. It seems I should not explicitly use a different version of gax-java that is out of step with google-cloud-java either.
Most helpful comment
I found the problem. I'll put the diagnosis in the PR description.