If I have a Flow where in the middle of it it check for something (e.g. if a file exists).
If it exist -> stop the flow there and then with a success state.
If it does not -> proceed to execute the rest of the flow.
Currently this is not possible (to my knowledge). Discussed with @cicdw here: https://stackoverflow.com/questions/64208746/can-a-flow-be-terminated-if-a-specific-task-fails
Make it possible to stop a flow (before reaching all tasks) with some state based on some condition.
I believe this was added since then in #3497
See https://github.com/PrefectHQ/prefect/blob/master/src/prefect/tasks/prefect/flow_run_cancel.py
That only cancels a flow run, which will stop execution but won't end with a "success" state.
@snenkov , without more info, the way I'd handle this is by encoding the check explicitly in your flow as a conditional. Your description of the issue above (stop early with a success state on a conditional) doesn't match the description in the stackoverflow issue (stop early if a single task fails) though, so I'm not sure what you want.
If you encode your task dependencies in such a way that downstream tasks should only run if upstream tasks succeed (and aren't skipped), then you can do this all as part of a flow run. Here's an example flow
from prefect import Flow, task, case, Parameter
@task
def is_even(x):
return x % 2 == 0
@task
def inc(x):
return x + 1
@task
def echo(x):
print(x)
with Flow("test") as flow:
x = Parameter("x")
# Do some work unconditionally
y = inc(x)
# Only run the remaining tasks if y is even
with case(is_even(y), True):
echo("Conditional tasks were run, since y was even")
z = inc(y)
flow.run(x=2) # y = x + 1 = 3, so the remaining tasks don't run
Output:
[2020-10-20 20:09:34] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-10-20 20:09:34] INFO - prefect.TaskRunner | Task 'x': Starting task run...
[2020-10-20 20:09:34] INFO - prefect.TaskRunner | Task 'x': Finished task run for task with final state: 'Success'
[2020-10-20 20:09:34] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
[2020-10-20 20:09:34] INFO - prefect.TaskRunner | Task 'inc': Finished task run for task with final state: 'Success'
[2020-10-20 20:09:34] INFO - prefect.TaskRunner | Task 'is_even': Starting task run...
[2020-10-20 20:09:34] INFO - prefect.TaskRunner | Task 'is_even': Finished task run for task with final state: 'Success'
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "False" did not match "True"')
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | Task 'echo': Starting task run...
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | Task 'echo': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | Task 'inc': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:09:35] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
If the conditional task (is_cond) fails, the downstream tasks also wouldn't run, since an upstream task failed (so the downstream tasks won't be triggered by default), which should satisfy the request in your Stackoverflow post.
Thanks again @jcrist
I guess my question here is related to the one here too: https://github.com/PrefectHQ/prefect/discussions/3545
One issue I stumbled upon which I don't know if it's a bug or I am simply doing something wrong is when I do the conditional case situation with a .map Thing break apart somewhere.
Here is my code:
import prefect
from prefect import task, Flow, Parameter, case
@task
def check_task(input):
if len(input) == 1:
raise prefect.engine.signals.SKIP()
else:
return (1, 'a', 3)
@task
def main_task(input_result: tuple) -> tuple:
logger = prefect.context.get("logger")
logger.info('Starts the main_task')
int_1 = input_result[0]
int_2 = input_result[1]
int_3 = input_result[2]
logger.info(f'Inputs {int_1} {int_2} {int_3}')
@task
def is_tuple(x):
return isinstance(x, tuple)
@task
def dummy_task():
logger = prefect.context.get("logger")
logger.info('This should not print with the main_task.')
def prepare_flow() -> Flow:
with Flow("example") as flow:
input = Parameter("input", required=True)
check_result = check_task.map(input)
with case(is_tuple.map(check_result), True):
final_result = main_task.map(check_result)
with case(is_tuple.map(check_result), False):
final_result_2 = dummy_task()
return flow
I use the following run parameters:
{
"input": ["a", "ab"]
}
The first map should use "a" and skip all tasks after check_task the second map should be "ab" and should run the part of the flow where the check evaluates to true:
with case(is_tuple(check_result), True):
final_result = main_task(check_result)
When ends up happening however is that all is being skipped.
Here is my. flow diagram.
What's going wrong here?

PS. When I try without the map each of the list values separately, all works as expected.
Maps don't automatically propagate. In the above, you map over the input to create check_task, but then the remaining case calls aren't called on the individual results of the map, they're called on the total mapped result (the full list(map(check_task(input))), not the elements of it).
I think what you're looking for here is a way to map a set of tasks across a number of inputs, which is what apply_map is for.
See https://docs.prefect.io/core/concepts/mapping.html#complex-mapped-pipelines for more info.
The following does what you're asking for I think:
import prefect
from prefect import task, Flow, Parameter, case, apply_map
@task
def check_task(input):
if len(input) == 1:
raise prefect.engine.signals.SKIP()
else:
return (1, 'a', 3)
@task
def main_task(input_result: tuple) -> tuple:
logger = prefect.context.get("logger")
logger.info('Starts the main_task')
int_1 = input_result[0]
int_2 = input_result[1]
int_3 = input_result[2]
logger.info(f'Inputs {int_1} {int_2} {int_3}')
@task
def is_tuple(x):
return isinstance(x, tuple)
@task
def dummy_task():
logger = prefect.context.get("logger")
logger.info('This should not print with the main_task.')
def subflow(check_result):
cond = is_tuple(check_result)
with case(cond, True):
return main_task(check_result)
with case(cond, False):
return dummy_task()
with Flow("example") as flow:
input = Parameter("input", required=True)
check_result = check_task.map(input)
apply_map(subflow, check_result)
flow.run(input=["a", "ab"])
[2020-10-20 20:39:50] INFO - prefect.FlowRunner | Beginning Flow run for 'example'
[2020-10-20 20:39:50] INFO - prefect.TaskRunner | Task 'input': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'input': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task': Finished task run for task with final state: 'Mapped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task[0]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | SKIP signal raised: SKIP(None)
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task[0]': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task[1]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task[1]': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple': Finished task run for task with final state: 'Mapped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple[0]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple[0]': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple[1]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple[1]': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Mapped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)[0]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)[0]': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)[1]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)[1]': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task': Finished task run for task with final state: 'Mapped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task[0]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task[0]': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task[1]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.main_task[1] | Starts the main_task
[2020-10-20 20:39:51] INFO - prefect.main_task[1] | Inputs 1 a 3
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task[1]': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
This looks resolved, let us know if you have any other issues :)
Most helpful comment
Maps don't automatically propagate. In the above, you map over the
inputto createcheck_task, but then the remainingcasecalls aren't called on the individual results of the map, they're called on the total mapped result (the fulllist(map(check_task(input))), not the elements of it).I think what you're looking for here is a way to map a set of tasks across a number of inputs, which is what
apply_mapis for.See https://docs.prefect.io/core/concepts/mapping.html#complex-mapped-pipelines for more info.
The following does what you're asking for I think: