Prefect: Terminating a flow (conditionally) without executing all tasks.

Created on 20 Oct 2020  路  5Comments  路  Source: PrefectHQ/prefect

Current behavior

If I have a Flow where in the middle of it it check for something (e.g. if a file exists).
If it exist -> stop the flow there and then with a success state.
If it does not -> proceed to execute the rest of the flow.

Currently this is not possible (to my knowledge). Discussed with @cicdw here: https://stackoverflow.com/questions/64208746/can-a-flow-be-terminated-if-a-specific-task-fails

Proposed behavior

Make it possible to stop a flow (before reaching all tasks) with some state based on some condition.

enhancement

Most helpful comment

Maps don't automatically propagate. In the above, you map over the input to create check_task, but then the remaining case calls aren't called on the individual results of the map, they're called on the total mapped result (the full list(map(check_task(input))), not the elements of it).

I think what you're looking for here is a way to map a set of tasks across a number of inputs, which is what apply_map is for.
See https://docs.prefect.io/core/concepts/mapping.html#complex-mapped-pipelines for more info.

The following does what you're asking for I think:

import prefect
from prefect import task, Flow, Parameter, case, apply_map


@task
def check_task(input):
    if len(input) == 1:
        raise prefect.engine.signals.SKIP()
    else:
        return (1, 'a', 3)


@task
def main_task(input_result: tuple) -> tuple:
    logger = prefect.context.get("logger")
    logger.info('Starts the main_task')
    int_1 = input_result[0]
    int_2 = input_result[1]
    int_3 = input_result[2]
    logger.info(f'Inputs {int_1} {int_2} {int_3}')


@task
def is_tuple(x):
    return isinstance(x, tuple)


@task
def dummy_task():
    logger = prefect.context.get("logger")
    logger.info('This should not print with the main_task.')


def subflow(check_result):
    cond = is_tuple(check_result)
    with case(cond, True):
        return main_task(check_result)
    with case(cond, False):
        return dummy_task()

with Flow("example") as flow:

    input = Parameter("input", required=True)
    check_result = check_task.map(input)
    apply_map(subflow, check_result)


flow.run(input=["a", "ab"])
[2020-10-20 20:39:50] INFO - prefect.FlowRunner | Beginning Flow run for 'example'
[2020-10-20 20:39:50] INFO - prefect.TaskRunner | Task 'input': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'input': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task': Finished task run for task with final state: 'Mapped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task[0]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | SKIP signal raised: SKIP(None)
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task[0]': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task[1]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task[1]': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple': Finished task run for task with final state: 'Mapped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple[0]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple[0]': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple[1]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple[1]': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Mapped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)[0]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)[0]': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)[1]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)[1]': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task': Finished task run for task with final state: 'Mapped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task[0]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task[0]': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task[1]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.main_task[1] | Starts the main_task
[2020-10-20 20:39:51] INFO - prefect.main_task[1] | Inputs 1 a 3
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task[1]': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

All 5 comments

That only cancels a flow run, which will stop execution but won't end with a "success" state.


@snenkov , without more info, the way I'd handle this is by encoding the check explicitly in your flow as a conditional. Your description of the issue above (stop early with a success state on a conditional) doesn't match the description in the stackoverflow issue (stop early if a single task fails) though, so I'm not sure what you want.

If you encode your task dependencies in such a way that downstream tasks should only run if upstream tasks succeed (and aren't skipped), then you can do this all as part of a flow run. Here's an example flow

from prefect import Flow, task, case, Parameter


@task
def is_even(x):
    return x % 2 == 0


@task
def inc(x):
    return x + 1


@task
def echo(x):
    print(x)


with Flow("test") as flow:
    x = Parameter("x")

    # Do some work unconditionally
    y = inc(x)

    # Only run the remaining tasks if y is even
    with case(is_even(y), True):
        echo("Conditional tasks were run, since y was even")
        z = inc(y)


flow.run(x=2)  # y = x + 1 = 3, so the remaining tasks don't run

Output:

[2020-10-20 20:09:34] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-10-20 20:09:34] INFO - prefect.TaskRunner | Task 'x': Starting task run...
[2020-10-20 20:09:34] INFO - prefect.TaskRunner | Task 'x': Finished task run for task with final state: 'Success'
[2020-10-20 20:09:34] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
[2020-10-20 20:09:34] INFO - prefect.TaskRunner | Task 'inc': Finished task run for task with final state: 'Success'
[2020-10-20 20:09:34] INFO - prefect.TaskRunner | Task 'is_even': Starting task run...
[2020-10-20 20:09:34] INFO - prefect.TaskRunner | Task 'is_even': Finished task run for task with final state: 'Success'
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "False" did not match "True"')
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | Task 'echo': Starting task run...
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | Task 'echo': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
[2020-10-20 20:09:35] INFO - prefect.TaskRunner | Task 'inc': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:09:35] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

If the conditional task (is_cond) fails, the downstream tasks also wouldn't run, since an upstream task failed (so the downstream tasks won't be triggered by default), which should satisfy the request in your Stackoverflow post.

Thanks again @jcrist
I guess my question here is related to the one here too: https://github.com/PrefectHQ/prefect/discussions/3545

One issue I stumbled upon which I don't know if it's a bug or I am simply doing something wrong is when I do the conditional case situation with a .map Thing break apart somewhere.

Here is my code:

import prefect
from prefect import task, Flow, Parameter, case


@task
def check_task(input):
    if len(input) == 1:
        raise prefect.engine.signals.SKIP()
    else:
        return (1, 'a', 3)


@task
def main_task(input_result: tuple) -> tuple:
    logger = prefect.context.get("logger")
    logger.info('Starts the main_task')
    int_1 = input_result[0]
    int_2 = input_result[1]
    int_3 = input_result[2]
    logger.info(f'Inputs {int_1} {int_2} {int_3}')


@task
def is_tuple(x):
    return isinstance(x, tuple)



@task
def dummy_task():
    logger = prefect.context.get("logger")
    logger.info('This should not print with the main_task.')


def prepare_flow() -> Flow:
    with Flow("example") as flow:

        input = Parameter("input", required=True)
        check_result = check_task.map(input)

        with case(is_tuple.map(check_result), True):
            final_result = main_task.map(check_result)

        with case(is_tuple.map(check_result), False):
            final_result_2 = dummy_task()

    return flow

I use the following run parameters:

{
  "input": ["a", "ab"]
} 

The first map should use "a" and skip all tasks after check_task the second map should be "ab" and should run the part of the flow where the check evaluates to true:

        with case(is_tuple(check_result), True):
            final_result = main_task(check_result)

When ends up happening however is that all is being skipped.
Here is my. flow diagram.

What's going wrong here?
image

PS. When I try without the map each of the list values separately, all works as expected.

Maps don't automatically propagate. In the above, you map over the input to create check_task, but then the remaining case calls aren't called on the individual results of the map, they're called on the total mapped result (the full list(map(check_task(input))), not the elements of it).

I think what you're looking for here is a way to map a set of tasks across a number of inputs, which is what apply_map is for.
See https://docs.prefect.io/core/concepts/mapping.html#complex-mapped-pipelines for more info.

The following does what you're asking for I think:

import prefect
from prefect import task, Flow, Parameter, case, apply_map


@task
def check_task(input):
    if len(input) == 1:
        raise prefect.engine.signals.SKIP()
    else:
        return (1, 'a', 3)


@task
def main_task(input_result: tuple) -> tuple:
    logger = prefect.context.get("logger")
    logger.info('Starts the main_task')
    int_1 = input_result[0]
    int_2 = input_result[1]
    int_3 = input_result[2]
    logger.info(f'Inputs {int_1} {int_2} {int_3}')


@task
def is_tuple(x):
    return isinstance(x, tuple)


@task
def dummy_task():
    logger = prefect.context.get("logger")
    logger.info('This should not print with the main_task.')


def subflow(check_result):
    cond = is_tuple(check_result)
    with case(cond, True):
        return main_task(check_result)
    with case(cond, False):
        return dummy_task()

with Flow("example") as flow:

    input = Parameter("input", required=True)
    check_result = check_task.map(input)
    apply_map(subflow, check_result)


flow.run(input=["a", "ab"])
[2020-10-20 20:39:50] INFO - prefect.FlowRunner | Beginning Flow run for 'example'
[2020-10-20 20:39:50] INFO - prefect.TaskRunner | Task 'input': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'input': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task': Finished task run for task with final state: 'Mapped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task[0]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | SKIP signal raised: SKIP(None)
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task[0]': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task[1]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'check_task[1]': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple': Finished task run for task with final state: 'Mapped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple[0]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple[0]': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple[1]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'is_tuple[1]': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Mapped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)[0]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)[0]': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)[1]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'case(True)[1]': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task': Finished task run for task with final state: 'Mapped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task[0]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task[0]': Finished task run for task with final state: 'Skipped'
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task[1]': Starting task run...
[2020-10-20 20:39:51] INFO - prefect.main_task[1] | Starts the main_task
[2020-10-20 20:39:51] INFO - prefect.main_task[1] | Inputs 1 a 3
[2020-10-20 20:39:51] INFO - prefect.TaskRunner | Task 'main_task[1]': Finished task run for task with final state: 'Success'
[2020-10-20 20:39:51] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

This looks resolved, let us know if you have any other issues :)

Was this page helpful?
0 / 5 - 0 ratings

Related issues

cicdw picture cicdw  路  4Comments

gryBox picture gryBox  路  3Comments

jlowin picture jlowin  路  4Comments

rej-jsa picture rej-jsa  路  4Comments

ponggung picture ponggung  路  3Comments