Triggers don't seem to call when passed to a .map() Task. Instead the .map() seems to try and process the exception value instead.
Expect the trigger to function in the same way as for non-map Task.
from typing import List, Dict
from prefect import Flow
from prefect import Task
from prefect.engine.cache_validators import all_parameters
from prefect.triggers import all_successful
class ExtractDummyTask(Task):
def run(self) -> List[Dict]:
records = [
{"data": "dummy"},
{"data": "dummier"},
{"data": "dummiest"},
]
raise RuntimeError("my test error")
return records
class TransformDummyTask(Task):
def run(self, data: List) -> List:
return data
class LoadDummyTask(Task):
def run(self, data: List) -> None:
print(f"Data: {data}")
extract_dummy = ExtractDummyTask(trigger=all_successful)
transform_dummy = TransformDummyTask(trigger=all_successful)
load_dummy = LoadDummyTask(trigger=all_successful)
# Create dummy flow
with Flow("dummy_flow") as flow:
# Extract
values = extract_dummy(
)
# Transform
# transformed_values = transform_dummy(data=values) # This works as expected
transformed_values = transform_dummy.map(data=values)
# Load
load_dummy(data=transformed_values)
flow.run()
Tried this locally on mac as well as local dask - same behaviour in both.
This is definitely a bug, but for anyone reading along at home I'd like to provide some context for why this was originally implemented as _intentional_ behavior: if a user writes a multi-level mapped Flow such as:
from prefect import task, Flow
@task
def return_stuff():
return [0, 1, 2]
@task
def divide(x):
return 1 / x
@task
def subtract(y):
return y - 1
with Flow("multi-level mapping") as flow:
final = subtract.map(divide.map(return_stuff))
flow_state = flow.run()
print(flow_state.result[final].map_states)
# [<TriggerFailed: "Trigger was "all_successful" but some of the upstream tasks failed.">,
# <Success: "Task run succeeded.">,
# <Success: "Task run succeeded.">]
Note that the second level of mapped tasks ran, and some even succeeded; this is only possible because each child task was allowed to check its own trigger. If the parent had checked the trigger, it would have decided that the second level of mapped tasks was not ready to proceed (because there was an upstream failure).
The trick will be to figure out a way to fix the bug here + still satisfy this multi-level situation
We think this may be affected by https://github.com/PrefectHQ/prefect/issues/2298 which the fix for is in the 0.11.0 release branch, so let's revisit this from the perspective of how that potentially changes this.
Hey, I don鈥檛 think 2298 affects this鈥f I understand correctly, 2298 changes the trigger function signature but the problem here is the trigger function isn鈥檛 evaluated at the parent level when using a mapped task.
Am currently running 0.11.4 and seeing the same behaviour as the original post.
Hi @andywaugh would you be willing to run your Flow against master? I think the recent mapping refactor will resolve this but it would be good to confirm.
Hey @cicdw - thanks for the swift reply. Yep, have just re-run against master and it seems the refactor has indeed resolved the issue.
Most helpful comment
Hey @cicdw - thanks for the swift reply. Yep, have just re-run against master and it seems the refactor has indeed resolved the issue.