Prefect: Trigger failure in Task.map

Created on 27 Apr 2020  路  5Comments  路  Source: PrefectHQ/prefect

Description

Triggers don't seem to call when passed to a .map() Task. Instead the .map() seems to try and process the exception value instead.

Expected Behavior

Expect the trigger to function in the same way as for non-map Task.

Reproduction

from typing import List, Dict
from prefect import Flow
from prefect import Task
from prefect.engine.cache_validators import all_parameters
from prefect.triggers import all_successful


class ExtractDummyTask(Task):
    def run(self) -> List[Dict]:
        records = [
            {"data": "dummy"},
            {"data": "dummier"},
            {"data": "dummiest"},
        ]
        raise RuntimeError("my test error")
        return records


class TransformDummyTask(Task):
    def run(self, data: List) -> List:
        return data


class LoadDummyTask(Task):
    def run(self, data: List) -> None:
        print(f"Data: {data}")


extract_dummy = ExtractDummyTask(trigger=all_successful)
transform_dummy = TransformDummyTask(trigger=all_successful)
load_dummy = LoadDummyTask(trigger=all_successful)

# Create dummy flow
with Flow("dummy_flow") as flow:
    # Extract
    values = extract_dummy(
    )

    # Transform
    # transformed_values = transform_dummy(data=values) # This works as expected
    transformed_values = transform_dummy.map(data=values)

    # Load
    load_dummy(data=transformed_values)

flow.run()

Environment

Tried this locally on mac as well as local dask - same behaviour in both.

bug

Most helpful comment

Hey @cicdw - thanks for the swift reply. Yep, have just re-run against master and it seems the refactor has indeed resolved the issue.

All 5 comments

This is definitely a bug, but for anyone reading along at home I'd like to provide some context for why this was originally implemented as _intentional_ behavior: if a user writes a multi-level mapped Flow such as:

from prefect import task, Flow

@task
def return_stuff():
    return [0, 1, 2]

@task
def divide(x):
    return 1 / x

@task
def subtract(y):
    return y - 1

with Flow("multi-level mapping") as flow:
    final = subtract.map(divide.map(return_stuff))

flow_state = flow.run()

print(flow_state.result[final].map_states)

# [<TriggerFailed: "Trigger was "all_successful" but some of the upstream tasks failed.">,
#  <Success: "Task run succeeded.">,
#  <Success: "Task run succeeded.">]

Note that the second level of mapped tasks ran, and some even succeeded; this is only possible because each child task was allowed to check its own trigger. If the parent had checked the trigger, it would have decided that the second level of mapped tasks was not ready to proceed (because there was an upstream failure).

The trick will be to figure out a way to fix the bug here + still satisfy this multi-level situation

We think this may be affected by https://github.com/PrefectHQ/prefect/issues/2298 which the fix for is in the 0.11.0 release branch, so let's revisit this from the perspective of how that potentially changes this.

Hey, I don鈥檛 think 2298 affects this鈥f I understand correctly, 2298 changes the trigger function signature but the problem here is the trigger function isn鈥檛 evaluated at the parent level when using a mapped task.

Am currently running 0.11.4 and seeing the same behaviour as the original post.

Hi @andywaugh would you be willing to run your Flow against master? I think the recent mapping refactor will resolve this but it would be good to confirm.

Hey @cicdw - thanks for the swift reply. Yep, have just re-run against master and it seems the refactor has indeed resolved the issue.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

kforti picture kforti  路  3Comments

orcaman picture orcaman  路  3Comments

gryBox picture gryBox  路  3Comments

fgblomqvist picture fgblomqvist  路  4Comments

GZangl picture GZangl  路  3Comments