Google-cloud-python: Pub/Sub: future.result() doesn't raise exceptions from callback

Created on 16 Aug 2018  Â·  3Comments  Â·  Source: googleapis/google-cloud-python

Hi. I am trying to setup up a simple client that subscribes to a PubSub and consumes messages. I have a problem with handling exceptions raised in callback method. I would like to be able to handle them in the main thread. The reason why I would like to do it is those exceptions are impossible to recover from and the app should crash and not consume messages. In the subscribe method's docstring of Pub/Sub Client I have found that:

This method starts the receiver in the background and returns a
        *Future* representing its execution. Waiting on the future (calling
        ``result()``) will block forever or until a non-recoverable error
        is encountered (such as loss of network connectivity). Cancelling the
        future will signal the process to shutdown gracefully and exit.

And example code snippet:

from google.cloud.pubsub_v1 import subscriber

            subscriber_client = pubsub.SubscriberClient()

            # existing subscription
            subscription = subscriber_client.subscription_path(
                'my-project-id', 'my-subscription')

            def callback(message):
                print(message)
                message.ack()

            future = subscriber.subscribe(
                subscription, callback)

            try:
                future.result()
            except KeyboardInterrupt:
                future.cancel()

Unfortunately I cannot get my code to capture any exceptions and cancel future. I am pretty sure I am doing something wrong.

OS type and version: macOs High Sierra (10.13.6)
Python version: Python 3.6.5
Google Cloud Lib versions:

google-api-core (1.3.0)
google-auth (1.5.1)
google-cloud-pubsub (0.37.1)
googleapis-common-protos (1.5.3)
grpc-google-iam-v1 (0.11.4)
grpcio (1.14.1)
protobuf (3.6.1)

Stacktrace:

INFO:__main__:Message {
  data: b'{"id":"02121f45-4608-4b4d-858e-31004d6ea12e","proj...'
  attributes: {
    "buildId": "02121f45-4608-4b4d-858e-31004d6ea12e",
    "status": "WORKING"
  }
}
ERROR:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Top-level exception occurred in callback while processing a message
Traceback (most recent call last):
  File "/Users/gruzewski/.pyenv/versions/3.6.5/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 63, in _wrap_callback_errors
    callback(message)
  File "/Users/gruzewski/Library/Preferences/PyCharm2018.1/scratches/scratch.py", line 12, in callback
    raise RuntimeError('Test')
RuntimeError: Test
INFO:__main__:Message {
  data: b'{"id":"02121f45-4608-4b4d-858e-31004d6ea12e","proj...'
  attributes: {
    "buildId": "02121f45-4608-4b4d-858e-31004d6ea12e",
    "status": "SUCCESS"
  }
}

Steps to reproduce:

1. Run the example code.
2. Turn off wifi.

Code example:

import logging

from google.cloud import pubsub_v1


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def callback(message):
    logger.info(message)
    raise RuntimeError('Test')


def main():
    subscriber = pubsub_v1.SubscriberClient()

    future = subscriber.subscribe('subscription/path', callback=callback)

    try:
        future.result()
        logger.error('I should not be here.')
    except RuntimeError as e:
        logger.exception('Exception during listing for messages. Shutting down. Exception: {}'.format(e))
        future.cancel()
        exit(1)

if __name__ == "__main__":
    main()

Most helpful comment

The future only bubbles exceptions that occur with the actual API call, not with individual callbacks. You'll need to do your own error handling to capture those, something like:

def callback(message):
    try:
        ...
    except Exception as e:
         # set some flag/mutex/condition to signal the main thread.

Closing for now, but happy to answer any follow-up questions.

All 3 comments

The future only bubbles exceptions that occur with the actual API call, not with individual callbacks. You'll need to do your own error handling to capture those, something like:

def callback(message):
    try:
        ...
    except Exception as e:
         # set some flag/mutex/condition to signal the main thread.

Closing for now, but happy to answer any follow-up questions.

Thanks @theacodes ! Looks like that did the job. For everyone interested, this is what I did in the end.

import logging
from threading import Event

from google.cloud import pubsub_v1

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def callback(message, event):
    logger.info(message)
    event.set()


def main():
    subscriber = pubsub_v1.SubscriberClient()

    event = Event()

    def callback_wrapper(message):
        callback(message, event)

    future = subscriber.subscribe('subscription/path', callback=callback_wrapper)

    event.wait()
    logger.exception('Got event. Shutting down.')
    future.cancel()
    exit(1)

if __name__ == "__main__":
    main()

Glad you got it worked out!

On Thu, Aug 16, 2018, 11:43 AM Jacek Gruzewski notifications@github.com
wrote:

Thanks @theacodes https://github.com/theacodes ! Looks like that did
the job. For everyone interested, this is what I did in the end.

import logging
from threading import Event

from google.cloud import pubsub_v1

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def callback(message, event):
logger.info(message)
event.set()

def main():
subscriber = pubsub_v1.SubscriberClient()

event = Event()

def callback_wrapper(message):
    callback(message, event)

future = subscriber.subscribe('subscription/path', callback=callback_wrapper)

event.wait()
logger.exception('Got event. Shutting down.')
future.cancel()
exit(1)

if __name__ == "__main__":
main()

—
You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/5812#issuecomment-413645990,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAPUc28n1oHxBBUy8ihwMLMdD5d_6SLJks5uRb1qgaJpZM4WAS5W
.

Was this page helpful?
0 / 5 - 0 ratings