A clear description of the bug
Version 0.10.2 -- and carried into 0.10.3 -- introduced an explicit Merge in the visualization.

What did you expect to happen instead?
I expected a single Merge as I defined it as merged_result:
Tested in 0.9.8 and 0.10.1:

A minimal example that exhibits the behavior.
Given this code:
from prefect import task, Flow
from prefect.tasks.control_flow import ifelse, merge
@task
def poll_for_new_records():
pass
@task
def is_poll_result_empty(polled_data):
if polled_data:
return True
else:
return False
@task
def skip_right_past_it_all():
pass
@task
def extract():
pass
@task
def transform(payload):
pass
@task
def load(payload):
pass
with Flow("Test ETL") as flow:
p = poll_for_new_records()
e = extract()
ifelse(is_poll_result_empty(p), skip_right_past_it_all, e)
t = transform(e)
l = load(t)
merged_result = merge(skip_right_past_it_all, l)
flow.visualize()
Any additional information about your environment
Anaconda, with an environment of Python 3.7, and Prefect 0.9.1 through 0.10.3 tested.
Optionally run prefect diagnostics from the command line and paste the information here
{
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "Windows-10-10.0.18362-SP0",
"prefect_version": "0.10.3",
"python_version": "3.7.7"
}
}
Related to #2310
cc @jcrist
Edit: ignore this, I hadn't thought things totally through. The issue here is about *visualizing the flow, not errors in the execution of the flow (afaict). The code I wrote in this comment also breaks some other invariants and should be equally ignored.*
~Hmmm, this is tricky. I think what we'd like to work is:~
from prefect import task, Flow
from prefect.tasks.control_flow import ifelse, merge
@task
def poll_for_new_records():
pass
@task
def is_poll_result_empty(polled_data):
if polled_data:
return True
else:
return False
@task
def skip_right_past_it_all():
pass
@task
def extract():
pass
@task
def transform(payload):
pass
@task
def load(payload):
pass
with Flow("Test ETL") as flow:
p = poll_for_new_records()
e = extract()
t = transform(e)
l = load(t)
merged_result = ifelse(
is_poll_result_empty(p),
skip_right_past_it_all,
l,
)
flow.visualize()
~But this doesn't work because the condition isn't set as an upstream task of extract(). I'm not sure what the best fix is here. cc @cicdw, since they originally filed #1735. We may want to revert #2310, I'm not sure if we can get the behavior we want with a functional api.~
Thinking more about user api here for expressing complex dependency trees and conditions, I wonder if we'd be better served by a contextmanager api for expressing conditions and branches. Perhaps something like:
with Flow("Test ETL") as flow:
p = poll_for_new_records()
with case(p, True):
skip_right_past_it_all()
with case(p, False):
e = extract()
t = transform(e)
l = load(t)
this makes it easier to express sets of tasks that should be run on each branch (rather than just a single task with dependencies that you may or may not want to skip). I think it's also fairly readable, but that's more of an opinion.
Just a thought. Apologies if this has been discussed before, I'm fairly new to working on Prefect.
As long as the original Flow is still allowing for the correct data exchange and still responding to upstream states correctly, I don't think this is an issue / bug (please correct me if there is actually something unexpected happening at runtime though).
There are a few places where Prefect has to introduce new tasks on behalf of the user (the ifelse tasks are another good example where 3 tasks are introduced). I do agree that this can be a little surprising, and occasionally even inefficient so I'd really like to take up @jcrist 's suggestion and try to make this interface as simple and flexible and possible.
In the meantime I'll close this issue in favor of a new issue containing the API changes @jcrist has in mind 馃憤 and we can make a broader push to simplify / clarify the branching / merging interface
I've also added these as examples for thought. These flows work in Prefect as diagrammed in BPMN: the exceptions on process is a flow process_handler, and the non-interrupting events on each task are task handlers. It works really well. I added a second diagram that I say is 2% more clunky, because my expectation is that I would have two "ends", end1 and end2. Which again, works pretty okay in Prefect as-is: it throws a warning, but used to work. No action required, just FYI.
Thanks for the hard work. It's pretty amazing to be able to lay out a detailed idea for customers in BPMN, and turn around and make it happen in Python.
Aside, in BPMN I use the conditional flow quite a lot instead of a full exclusive gateway when I'm only looking at a single case and a default.

This next version is a touch less clean because of the jumping end (essentially what I model with ifelse/merge.

My 2c here:
I disagree with the new way of adding a new Merge task for switch or ifelse. On my case, some flows broke because there were new terminal tasks on the new task. I have change in favor of using reference tasks instead, but I dislike how my graph now has a section that makes little sense.
More importantly, the result part is unnecessary when using switch or ifelse as triggers for sections of a graph.
What I plan to do on my side, is to rescue the old switch function:
from typing import Dict, Any
import prefect
from prefect import Task
from prefect.tasks.control_flow.conditional import CompareValue
def old_switch(condition: Task, cases: Dict[Any, Task]) -> None:
"""... the docstring ..."""
with prefect.tags("switch"):
for value, task in cases.items():
task = prefect.utilities.tasks.as_task(task)
match_condition = CompareValue(value=value).bind(value=condition)
task.set_dependencies(upstream_tasks=[match_condition])
Since the change is only that missing return merge(*cases.values()), wouldn't it make sense to parametrize this with
def switch(condition: Task, cases: Dict[Any, Task], return_result: bool=True) -> ...
?
I think @jcrist's new API is extremely interesting. Expressing conditionals without relying on runtime Python syntax is so hard, but that almost feels natural:
# python
if condition == a:
do_a()
elif condition == b:
do_b()
# prefect
with case(condition, a):
do_a()
with case(condition, b):
do_b()
The only hiccup would be respecting the order of the case statements (and I just realized we may only do that implicitly in our current switch implementation thanks to ordered dictionary semantics)
HOWEVER I actually think the issue @dojeda raises should be considered a bug; Prefect has implicit reliance on terminal tasks, so introducing a new one without warning has behavioral implications. I confess I didn't think about this when we discussed the original PR!
@dojeda / @staylorx we decided to revert this change here: https://github.com/PrefectHQ/prefect/pull/2379 and will be cutting a 0.10.4 release with this patch included.
We'll also be working on a more ergonomic case / switch / merge API to follow up on all this -- thanks for the quick feedback!
Happy to help @cicdw and @jcrist if/when I can. My team and I have really glommed on to this tool; thank you for the hard work and rapid turnaround.
Proposal for new conditional api is here: https://github.com/PrefectHQ/prefect/issues/2381.
Thanks for the quick responses by the prefect team and collaborators!
Most helpful comment
Happy to help @cicdw and @jcrist if/when I can. My team and I have really glommed on to this tool; thank you for the hard work and rapid turnaround.