Prefect: Handling non-pickleable exceptions

Created on 20 Mar 2020  路  14Comments  路  Source: PrefectHQ/prefect

Not sure if you would agree that this is a bug rather than unsupported behavior.

Google libraries tend to include non-pickleable Client objects in the exceptions they raise (who knows why). So the following flow run locally gives a nice clear error:

from google.cloud import bigquery

@task
def bq_fail():
    bq = bigquery.Client()
    bq.query('select 1 from nonexistent_table').result()
    return True

with Flow('f') as f:
    bq_fail()

f.run()

...
google.api_core.exceptions.BadRequest: 400 Table name "nonexistent_table" missing dataset while no default dataset is set in the request.

(job ID: 69b11ec8-312b-49c4-91c9-aae701b7b8cf)

  -----Query Job SQL Follows-----

    |    .    |    .    |    .    |
   1:select 1 from nonexistent_table
    |    .    |    .    |    .    |
[2020-03-20 18:21:39,312] INFO - prefect.TaskRunner | Task 'bq_fail': finished task run for task with final state: 'Failed'
INFO:prefect.TaskRunner:Task 'bq_fail': finished task run for task with final state: 'Failed'
[2020-03-20 18:21:39,313] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
INFO:prefect.FlowRunner:Flow run FAILED: some reference tasks failed.

but running on a Dask client gives a much crazier failure:

import distributed
from prefect.engine.executors import DaskExecutor
client = distributed.Client()
f.run(executor=DaskExecutor(client.scheduler.address))

...
Could not serialize object of type Failed.
Traceback (most recent call last):
  File "/Users/brett/model/.venv/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 38, in dumps
    result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
AttributeError: Can't pickle local object 'if_exception_type.<locals>.if_exception_type_predicate'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/brett/model/.venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 191, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
  File "/Users/brett/model/.venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 58, in pickle_dumps
    return {"serializer": "pickle"}, [pickle.dumps(x)]
  File "/Users/brett/model/.venv/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 51, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "/Users/brett/model/.venv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 1125, in dumps
    cp.dump(obj)
  File "/Users/brett/model/.venv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 482, in dump
    return Pickler.dump(self, obj)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 524, in save
    rv = reduce(self.proto)
  File "/Users/brett/model/.venv/lib/python3.7/site-packages/google/cloud/client.py", line 144, in __getstate__
    "Clients have non-trivial state that is local and unpickleable.",
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.

This is actually a very abbreviated version of the actual error: the real output contains like 10 variations of the above and sort of swamps all other output in the logs.

I understand that Google is the underlying culprit here, but it seems like Prefect could handle this case more gracefully, maybe by just catching a PickleError and re-raising as something more informative..?

bug

All 14 comments

Very interesting; for the sake of documentation, currently any unexpected exceptions which are raised during a task run are packaged up as the result of the corresponding Failed state. This means that if the exception is unpickleable, dask can't send that state to any downstream dependencies (or back to the Flow Runner for that matter).

I _do_ think we want to preserve the exception somehow so that people can implement state handlers / downstream trigger logic which act on the exception type (e.g., if isinstance(state.result, ValueError):). So I think there are two action items we might consider:

  • remove all traceback information from the exception prior to attaching it to the state (hopefully this makes most exceptions pickleable)
  • within the DaskExecutor have some logic for explicitly handling pickle errors

FWIW @bnaul your exact example didn't seem to cause problems for me (not a big deal, I've seen these errors before):

try:
    bq = bigquery.Client()
    bq.query('select 1 from nonexistent_table').result()
except Exception as exc:
    print(exc)
    vv = exc

cloudpickle.dumps(vv) # worked

Quick addendum: it's actually not the pickling that fails but the unpickling:

try:
    bq = bigquery.Client()
    bq.query('select 1 from nonexistent_table').result()
except Exception as exc:
    print(exc)
    vv = exc

cloudpickle.loads(cloudpickle.dumps(vv))  # fails

Another library that causes issues in this way is papermill for executing Jupyter notebooks. For some notebook w/ a simple error:

import papermill as pm
try:
    pm.execute_notebook("daily_ods/Quality.ipynb", "/tmp/out.ipynb")
except Exception as e:
    import cloudpickle
    cloudpickle.loads(cloudpickle.dumps(e))
...
~/model/.venv/lib/python3.7/site-packages/papermill/execute.py in raise_for_execution_errors(nb, output_path)
    187         write_ipynb(nb, output_path)
--> 188         raise error

PapermillExecutionError:
---------------------------------------------------------------------------
Exception encountered at "In [4]":
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-4-5c467476a5e8> in <module>
----> 3 start_date - pd.Timedelta(days=7)

NameError: name 'start_date' is not defined

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-18-39b0a7a82220> in <module>
      3 except Exception as e:
      4     import cloudpickle
----> 5     cloudpickle.loads(cloudpickle.dumps(e))

TypeError: __init__() missing 4 required positional arguments: 'source', 'ename', 'evalue', and 'traceback'

Again papermill is probably doing something weird with their traceback, but regardless the fact that it displays OK locally but causes issues when pickled seems like something that's foreseeable.

Maybe the best compromise is to just try/except pickle/unpickle before re-raising to Dask, and if there's a pickling problem then drop the traceback info like you suggested?

Just putting a note here from core discussions particularly with @jcrist that from the Dask side some stuff exists to raise this correctly and and that Prefect is probably getting in the way :) Our current recommendation for proceeding with this issue is more or less as Chris described in https://github.com/PrefectHQ/prefect/issues/2178#issuecomment-602110820 but I just want to reiterate that for action item 2, we probably need to provide different exception handling into the DaskExecutor for these particular pickle/unpickle exceptions that falls back/utilizes on how it is raised by Dask (which we may need to peek in there to see what Prefect can expect).

We are running into a similar issue where a failed boto call leads to the same python stack overflow in DaskExecutor as @bnaul 's example.

Looking through the botocore message I found the following comment in the ClientError exception that is triggering:

# Subclasses of ClientError's are dynamically generated and
# cannot be pickled unless they are attributes of a
# module. So at the very least return a ClientError back.

Not 100% sure how to interpret this, but it makes me think that stripping trace-back info won't resolve for this exception at least.

As a work around I tried re-raising an error as per the suggestion from @jcrist on slack yesterday:

lass MyBotoError(Exception):
    pass
def mytask(...):
    try:
        some_boto_thing()
    except SomeBotoError as exc:
        raise MyBotoError(str(exc))

Only change I made was use a RuntimeError to re-raise rather than a custom exception class. Are there any other suggestions for a work-around here? This is fairly problematic as it crashes the entire python environment, not just a failed flow run.

This is fairly problematic as it crashes the entire python environment, not just a failed flow run.

This is news to me, do you have a reproducible example to show this? What do you mean by crashes the environment?

Not 100% sure how to interpret this, but it makes me think that stripping trace-back info won't resolve for this exception at least.

Yeah, in Dask we've seen a few types of unpickleable exceptions:

  • Exceptions that add unpickleable attributes (rare)
  • Dynamically generated exceptions (common for some reason in libraries from google and amazon)

Dask properly handles unpickleable exceptions (we use tblib for tracebacks, and have fallbacks for unpickleable exceptions and tracebacks in the rare case of failure) - the error here comes from prefect failing to.

Anyway, I'll elevate this issue in our priorities and try to get a fix out soon. Thanks for the update.

I'm having trouble reproducing this error reliably because I have to wait on AWS to fail so it triggers that dynamically generated boto exception - I can't manually raise that exception because I don't know how to generate the dynamic object (might just be me but I can't follow this dynamically generated boto code).

I tried making a stand-alone exception that can't be unpickled - I tested raising the exception and manually pickling the exception object and when I manually unpickle it fails in the same way. However, weirdly, this does not cause the same behaviour when I raise it in the Task and run in DaskExecutor as the ClientError one does. Perhaps there is something I'm missing here? Happy to post my PoC code if that helps.

By crashes the environment I mean the python prefect app. The error message looks like this:

| INFO | bvj13xsiy5lo | prefect.FlowRunner | None | Beginning Flow run for 'XXXXX'
Fatal Python error: Cannot recover from stack overflow.

Thread 0x0000000116197dc0 (most recent call first):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 485 in save
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 885 in _batch_setitems
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 659 in save_reduce
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 549 in save
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 885 in _batch_setitems
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 659 in save_reduce
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 549 in save
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 885 in _batch_setitems
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 659 in save_reduce
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 549 in save
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 885 in _batch_setitems
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 659 in save_reduce
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 549 in save
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 885 in _batch_setitems
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 659 in save_reduce
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 549 in save
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 843 in _batch_appends
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 819 in save_list
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 504 in save
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 885 in _batch_setitems
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 659 in save_reduce
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 549 in save
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 885 in _batch_setitems
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 859 in save_dict
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 504 in save
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 885 in _batch_setitems
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 859 in save_dict
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/pickle.py", line 504 in save
<many many lines redacted>

By crashes the environment I mean the python prefect app. The error message looks like this:

Hmmm, that's odd. Looks like the pickle recursed for some reason, which blew python's stack. We indeed cannot recover from a stack overflow. Anyway, I expect this situation to be resolvable. For now, as I suggested on slack, you can catch boto errors and reraise them. If you want to apply this everywhere, you might do this with a decorator (untested).

from functools import wraps

def fix_boto_exceptions(func):
    @wraps(func)
    def inner(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except BaseBotoException as exc:    # I don't know what this class is called, but you probably do
            raise MyBotoException(str(exc))
    return inner

@task
@fix_boto_exceptions
def mytask(...):
    some_boto_thing()
    some_other_boto_thing()

So this work-around is exactly what I tried - my code is almost exactly like your's except I'm doing RuntimeError instead of MyBotoException. This still fails in exactly the same way - so it seems like one of the following:

1) It's not just the latest exception that is being stored, but rather some kind of exception chain that includes the previous boto exception. (I have no idea whether this is even a possibility)
2) This is actually triggered by a different exception somewhere inside boto, and not the one we are aware of and catching. (This is hard for me to debug because python crashes before my debuggger hits any unhandled exception or similar breakpoint). The exception happens only very rarely, and so I can keep trying to hit it and then add a breakpoint somewhere in prefect before the flow is restarted and then inspect the bits that are going to be deserialised into the retry flow in the hopes we can identify the object - but I'd need some guidance here on where that breakpoint should be and where I can find the pickled exceptions.
3) Something unrelated to the exception.

One thing that is extremely puzzling to me is why my UnpicklableException test case doesn't trigger the same behaviour.

Just to expand on this, perhaps the following code is easier to follow:

from prefect.engine.executors import DaskExecutor
from prefect.engine import signals
from prefect.core import Task
from prefect import Flow
import datetime
import pickle
import collections


class AttrDict(collections.UserDict):
    """ A dictionary that can be accessed via attributes """

    def __setattr__(self, key, value):
        if key == "data":
            super().__setattr__("data", value)
        else:
            self.data[key] = value

    def __getattr__(self, key):
        if key in self.data:
            return self.data[key]
        else:
            raise AttributeError

    def __delattr__(self, key):
        del self.data[key]


class UnPicklableException(Exception):
    # This is my wildly noddy way to make an exception that will pickle but not unpickle
    def __init__(self):
        self.data = AttrDict({"key": "value"})
        self.message = "Can't unpickle me"
        super().__init__()


def test_exception():
    try:
        raise UnPicklableException()
    except UnPicklableException as e:
        print(type(e))
        pe = pickle.dumps(e)
        pickle.loads(pe)  # This will trigger a recursion depth exceeded


# Test that the exception actually won't unpickle
# test_exception()


class FailingTask(Task):
    def run(self) -> str:
        try:
            # In my actual code we make boto call here
            raise UnPicklableException()
        except UnPicklableException:
            # In the actual code we catch botocore.exceptions.ClientError here
            # We are handling and signaling FAIL so that our exception management system doesn't raise a new ticket for something that we expect will happen regularly.
            self.logger.warning(
                "Exception was triggered, going to signal FAIL and let prefect retry this"
            )
            raise signals.FAIL()
        return "test"


failing_task = FailingTask(max_retries=3, retry_delay=datetime.timedelta(seconds=1))

with Flow("test_flow") as flow:
    failing_task()

executor = DaskExecutor(cluster_kwargs={"processes": False})

# If the Exception pickling was the cause of this issue I would expect this to fail, but it works just fine
flow.run(executor=executor)

It's not just the latest exception that is being stored, but rather some kind of exception chain that includes the previous boto exception. (I have no idea whether this is even a possibility)

Ah, good catch. Python supports chained exceptions, so the previous exception would leave stuff on the __cause__ and __context__ attributes of the raised MyBotoException. Can you try this one?

from functools import wraps

def fix_boto_exceptions(func):
    @wraps(func)
    def inner(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except BaseBotoException as exc:    # I don't know what this class is called, but you probably do
            raise MyBotoException(str(exc)) from None
    return inner

@task
@fix_boto_exceptions
def mytask(...):
    some_boto_thing()
    some_other_boto_thing()

Still not ideal, but I believe this should work for you.

Tried the from None - but unfortunately just got another stack overflow 馃槥

Do you have any thoughts on why my UnpicklableException test case is not failing? Its so hard to debug if you can't reproduce the issue...

Edit: looking at what from None does, it sets the __supress_context__ to True, but the base is still in the __context__ - I'm trying to figure out how to empty__context__ here - will update if I find anything.

try:
    raise RuntimeError("base")
except RuntimeError as e:
    print(e.__cause__)
    print(e.__context__)
    print(e.__suppress_context__)
    try:
        raise ValueError("next") from None
    except ValueError as ve:
        print(ve.__cause__)
        print(ve.__context__)
        print(ve.__suppress_context__)

Ok, I think a solution might be something like this:

from functools import wraps

def fix_boto_exceptions(func):
    @wraps(func)
    def inner(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except BaseBotoException as exc:    # I don't know what this class is called, but you probably do
            exception_message = str(exc)

        if exception_message:
            raise MyBotoException(str(exc))
    return inner

@task
@fix_boto_exceptions
def mytask(...):
    some_boto_thing()
    some_other_boto_thing()

It is even uglier, but if it works I'm happy! Will report back.

Just got the same stack overflow using the code like above. I'm not sure where to go from here.

Just checking in to see if there has been any progress here? I'd love to turn our retries on again!

Thanks!

No progress yet - someone will comment here when things have changed.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

jlowin picture jlowin  路  4Comments

petermorrow picture petermorrow  路  3Comments

dkapitan picture dkapitan  路  3Comments

cicdw picture cicdw  路  4Comments

fgblomqvist picture fgblomqvist  路  4Comments