Hi. I am trying to setup up a simple client that subscribes to a PubSub and consumes messages. I have a problem with handling exceptions raised in callback method. I would like to be able to handle them in the main thread. The reason why I would like to do it is those exceptions are impossible to recover from and the app should crash and not consume messages. In the subscribe method's docstring of Pub/Sub Client I have found that:
This method starts the receiver in the background and returns a
*Future* representing its execution. Waiting on the future (calling
``result()``) will block forever or until a non-recoverable error
is encountered (such as loss of network connectivity). Cancelling the
future will signal the process to shutdown gracefully and exit.
And example code snippet:
from google.cloud.pubsub_v1 import subscriber
subscriber_client = pubsub.SubscriberClient()
# existing subscription
subscription = subscriber_client.subscription_path(
'my-project-id', 'my-subscription')
def callback(message):
print(message)
message.ack()
future = subscriber.subscribe(
subscription, callback)
try:
future.result()
except KeyboardInterrupt:
future.cancel()
Unfortunately I cannot get my code to capture any exceptions and cancel future. I am pretty sure I am doing something wrong.
OS type and version: macOs High Sierra (10.13.6)
Python version: Python 3.6.5
Google Cloud Lib versions:
google-api-core (1.3.0)
google-auth (1.5.1)
google-cloud-pubsub (0.37.1)
googleapis-common-protos (1.5.3)
grpc-google-iam-v1 (0.11.4)
grpcio (1.14.1)
protobuf (3.6.1)
Stacktrace:
INFO:__main__:Message {
data: b'{"id":"02121f45-4608-4b4d-858e-31004d6ea12e","proj...'
attributes: {
"buildId": "02121f45-4608-4b4d-858e-31004d6ea12e",
"status": "WORKING"
}
}
ERROR:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Top-level exception occurred in callback while processing a message
Traceback (most recent call last):
File "/Users/gruzewski/.pyenv/versions/3.6.5/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 63, in _wrap_callback_errors
callback(message)
File "/Users/gruzewski/Library/Preferences/PyCharm2018.1/scratches/scratch.py", line 12, in callback
raise RuntimeError('Test')
RuntimeError: Test
INFO:__main__:Message {
data: b'{"id":"02121f45-4608-4b4d-858e-31004d6ea12e","proj...'
attributes: {
"buildId": "02121f45-4608-4b4d-858e-31004d6ea12e",
"status": "SUCCESS"
}
}
Steps to reproduce:
1. Run the example code.
2. Turn off wifi.
Code example:
import logging
from google.cloud import pubsub_v1
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def callback(message):
logger.info(message)
raise RuntimeError('Test')
def main():
subscriber = pubsub_v1.SubscriberClient()
future = subscriber.subscribe('subscription/path', callback=callback)
try:
future.result()
logger.error('I should not be here.')
except RuntimeError as e:
logger.exception('Exception during listing for messages. Shutting down. Exception: {}'.format(e))
future.cancel()
exit(1)
if __name__ == "__main__":
main()
The future only bubbles exceptions that occur with the actual API call, not with individual callbacks. You'll need to do your own error handling to capture those, something like:
def callback(message):
try:
...
except Exception as e:
# set some flag/mutex/condition to signal the main thread.
Closing for now, but happy to answer any follow-up questions.
Thanks @theacodes ! Looks like that did the job. For everyone interested, this is what I did in the end.
import logging
from threading import Event
from google.cloud import pubsub_v1
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def callback(message, event):
logger.info(message)
event.set()
def main():
subscriber = pubsub_v1.SubscriberClient()
event = Event()
def callback_wrapper(message):
callback(message, event)
future = subscriber.subscribe('subscription/path', callback=callback_wrapper)
event.wait()
logger.exception('Got event. Shutting down.')
future.cancel()
exit(1)
if __name__ == "__main__":
main()
Glad you got it worked out!
On Thu, Aug 16, 2018, 11:43 AM Jacek Gruzewski notifications@github.com
wrote:
Thanks @theacodes https://github.com/theacodes ! Looks like that did
the job. For everyone interested, this is what I did in the end.import logging
from threading import Eventfrom google.cloud import pubsub_v1
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)def callback(message, event):
logger.info(message)
event.set()def main():
subscriber = pubsub_v1.SubscriberClient()event = Event() def callback_wrapper(message): callback(message, event) future = subscriber.subscribe('subscription/path', callback=callback_wrapper) event.wait() logger.exception('Got event. Shutting down.') future.cancel() exit(1)if __name__ == "__main__":
main()—
You are receiving this because you were mentioned.Reply to this email directly, view it on GitHub
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/5812#issuecomment-413645990,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAPUc28n1oHxBBUy8ihwMLMdD5d_6SLJks5uRb1qgaJpZM4WAS5W
.
Most helpful comment
The future only bubbles exceptions that occur with the actual API call, not with individual callbacks. You'll need to do your own error handling to capture those, something like:
Closing for now, but happy to answer any follow-up questions.