Prefect: Ifelse introduces Merge task in visualization and flow in 0.10.2

Created on 21 Apr 2020  路  11Comments  路  Source: PrefectHQ/prefect

Description

A clear description of the bug

Version 0.10.2 -- and carried into 0.10.3 -- introduced an explicit Merge in the visualization.

2020-04-21 11_54_47-ETL Example - Jupyter Notebook - Prefect 0 10 2

Expected Behavior

What did you expect to happen instead?

I expected a single Merge as I defined it as merged_result:

Tested in 0.9.8 and 0.10.1:
2020-04-21 11_52_54-ETL Example - Jupyter Notebook - Prefect 0 9 8

Reproduction

A minimal example that exhibits the behavior.

Given this code:

from prefect import task, Flow
from prefect.tasks.control_flow import ifelse, merge

@task
def poll_for_new_records():
    pass

@task
def is_poll_result_empty(polled_data):
    if polled_data:
        return True
    else:
        return False

@task
def skip_right_past_it_all():
    pass

@task
def extract():
    pass

@task
def transform(payload):
    pass

@task
def load(payload):
    pass

with Flow("Test ETL") as flow:

    p = poll_for_new_records()

    e = extract()
    ifelse(is_poll_result_empty(p), skip_right_past_it_all, e)

    t = transform(e)
    l = load(t)

    merged_result = merge(skip_right_past_it_all, l)

flow.visualize()

Environment

Any additional information about your environment
Anaconda, with an environment of Python 3.7, and Prefect 0.9.1 through 0.10.3 tested.

Optionally run prefect diagnostics from the command line and paste the information here
{
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "Windows-10-10.0.18362-SP0",
"prefect_version": "0.10.3",
"python_version": "3.7.7"
}
}

Most helpful comment

Happy to help @cicdw and @jcrist if/when I can. My team and I have really glommed on to this tool; thank you for the hard work and rapid turnaround.

All 11 comments

Related to #2310

cc @jcrist

Edit: ignore this, I hadn't thought things totally through. The issue here is about *visualizing the flow, not errors in the execution of the flow (afaict). The code I wrote in this comment also breaks some other invariants and should be equally ignored.*

~Hmmm, this is tricky. I think what we'd like to work is:~

from prefect import task, Flow
from prefect.tasks.control_flow import ifelse, merge

@task
def poll_for_new_records():
    pass

@task
def is_poll_result_empty(polled_data):
    if polled_data:
        return True
    else:
        return False

@task
def skip_right_past_it_all():
    pass

@task
def extract():
    pass

@task
def transform(payload):
    pass

@task
def load(payload):
    pass

with Flow("Test ETL") as flow:

    p = poll_for_new_records()

    e = extract()
    t = transform(e)
    l = load(t)
    merged_result = ifelse(
        is_poll_result_empty(p),
        skip_right_past_it_all,
        l,
    )

flow.visualize()

~But this doesn't work because the condition isn't set as an upstream task of extract(). I'm not sure what the best fix is here. cc @cicdw, since they originally filed #1735. We may want to revert #2310, I'm not sure if we can get the behavior we want with a functional api.~

Thinking more about user api here for expressing complex dependency trees and conditions, I wonder if we'd be better served by a contextmanager api for expressing conditions and branches. Perhaps something like:

with Flow("Test ETL") as flow:

    p = poll_for_new_records()

    with case(p, True):
        skip_right_past_it_all()

    with case(p, False):
        e = extract()
        t = transform(e)
        l = load(t)

this makes it easier to express sets of tasks that should be run on each branch (rather than just a single task with dependencies that you may or may not want to skip). I think it's also fairly readable, but that's more of an opinion.

Just a thought. Apologies if this has been discussed before, I'm fairly new to working on Prefect.

As long as the original Flow is still allowing for the correct data exchange and still responding to upstream states correctly, I don't think this is an issue / bug (please correct me if there is actually something unexpected happening at runtime though).

There are a few places where Prefect has to introduce new tasks on behalf of the user (the ifelse tasks are another good example where 3 tasks are introduced). I do agree that this can be a little surprising, and occasionally even inefficient so I'd really like to take up @jcrist 's suggestion and try to make this interface as simple and flexible and possible.

In the meantime I'll close this issue in favor of a new issue containing the API changes @jcrist has in mind 馃憤 and we can make a broader push to simplify / clarify the branching / merging interface

I've also added these as examples for thought. These flows work in Prefect as diagrammed in BPMN: the exceptions on process is a flow process_handler, and the non-interrupting events on each task are task handlers. It works really well. I added a second diagram that I say is 2% more clunky, because my expectation is that I would have two "ends", end1 and end2. Which again, works pretty okay in Prefect as-is: it throws a warning, but used to work. No action required, just FYI.

Thanks for the hard work. It's pretty amazing to be able to lay out a detailed idea for customers in BPMN, and turn around and make it happen in Python.

Aside, in BPMN I use the conditional flow quite a lot instead of a full exclusive gateway when I'm only looking at a single case and a default.

BPMN - Workflow - very clean

This next version is a touch less clean because of the jumping end (essentially what I model with ifelse/merge.

BPMN - Workflow - 2pcent more clunky

My 2c here:

I disagree with the new way of adding a new Merge task for switch or ifelse. On my case, some flows broke because there were new terminal tasks on the new task. I have change in favor of using reference tasks instead, but I dislike how my graph now has a section that makes little sense.
More importantly, the result part is unnecessary when using switch or ifelse as triggers for sections of a graph.

What I plan to do on my side, is to rescue the old switch function:

from typing import Dict, Any

import prefect
from prefect import Task
from prefect.tasks.control_flow.conditional import CompareValue

def old_switch(condition: Task, cases: Dict[Any, Task]) -> None:
    """... the docstring ..."""

    with prefect.tags("switch"):
        for value, task in cases.items():
            task = prefect.utilities.tasks.as_task(task)
            match_condition = CompareValue(value=value).bind(value=condition)
            task.set_dependencies(upstream_tasks=[match_condition])

Since the change is only that missing return merge(*cases.values()), wouldn't it make sense to parametrize this with

def switch(condition: Task, cases: Dict[Any, Task], return_result: bool=True) -> ...

?

I think @jcrist's new API is extremely interesting. Expressing conditionals without relying on runtime Python syntax is so hard, but that almost feels natural:

# python
if condition == a:
  do_a()

elif condition == b:
  do_b()


# prefect
with case(condition, a):
    do_a()

with case(condition, b):
    do_b()

The only hiccup would be respecting the order of the case statements (and I just realized we may only do that implicitly in our current switch implementation thanks to ordered dictionary semantics)

HOWEVER I actually think the issue @dojeda raises should be considered a bug; Prefect has implicit reliance on terminal tasks, so introducing a new one without warning has behavioral implications. I confess I didn't think about this when we discussed the original PR!

@dojeda / @staylorx we decided to revert this change here: https://github.com/PrefectHQ/prefect/pull/2379 and will be cutting a 0.10.4 release with this patch included.

We'll also be working on a more ergonomic case / switch / merge API to follow up on all this -- thanks for the quick feedback!

Happy to help @cicdw and @jcrist if/when I can. My team and I have really glommed on to this tool; thank you for the hard work and rapid turnaround.

Proposal for new conditional api is here: https://github.com/PrefectHQ/prefect/issues/2381.

Thanks for the quick responses by the prefect team and collaborators!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

dkapitan picture dkapitan  路  3Comments

petermorrow picture petermorrow  路  3Comments

mark-w-325 picture mark-w-325  路  3Comments

ponggung picture ponggung  路  3Comments

joshmeek picture joshmeek  路  4Comments