Prefect: Behaviour differs from documentation: state.result & flow.get_tasks

Created on 23 Apr 2020  路  4Comments  路  Source: PrefectHQ/prefect

Prefect Version: 0.10.4

Current behavior

Minor issue - The following behaviour doesn't match up to that described by the documentation on result handling:

>>> task_ref = flow.get_tasks[0]
>>> state = flow.run()
>>> state.result[task_ref]._result  # a Task State's Result contains the Task's return value
<Result: 1>

Instead prefect requires that the task_ref be created via a name or tag call (flow.get_tasks(name='<name>') instead of flow.get_tasks[0]) and be extracted from its list (state.result[task_ref[0]] instead of state.result[task_ref]):

>>> task_ref = flow.get_tasks(name='<name>')
>>> state = flow.run()
>>> state.result[task_ref[0]]._result  # a Task State's Result contains the Task's return value
<Result: 1>

and also the documentation on Local Debugging:

from prefect import Flow, task

@task
def gotcha():
    tup = ('a', ['b'])
    try:
        tup[1] += ['c']
    except TypeError:
        assert len(tup[1]) == 1

flow = Flow(name="tuples", tasks=[gotcha])

state = flow.run()
state.result # {<Task: gotcha>: Failed("Unexpected error: AssertionError()")}

failed_state = state.result[gotcha]
raise failed_state.result

Examples

flow.get_tasks:

from prefect import Flow, task
import pandas as pd

@task(name='load data')
def _load_data() -> pd.DataFrame:
    return pd.DataFrame({'a': [1, 2, 3]})


with Flow('preprocess-VO') as flow:
    _load_data()

state = flow.run()
task_ref = flow.get_tasks[0]

Returns:

>>> TypeError: 'method' object is not subscriptable

state.result[task_ref]

from prefect import Flow, task
import pandas as pd

@task(name='load data')
def _load_data() -> pd.DataFrame:
    return pd.DataFrame({'a': [1, 2, 3]})


with Flow('preprocess-VO') as flow:
    _load_data()

state = flow.run()
task_ref = flow.get_tasks(name='load data')
state.result[task_ref]

Returns:

>>> TypeError: unhashable type: 'list'

state.result[<function name>]

from prefect import Flow, task
import pandas as pd

@task(name='load data')
def _load_data() -> pd.DataFrame:
    return pd.DataFrame({'a': [1, 2, 3]})


with Flow('preprocess-VO') as flow:
    _load_data()

state = flow.run()
task_ref = flow.get_tasks(name='load data')
state.result[_load_data]
KeyError: <Task: load data>

Proposed behavior

It may be a bit more readable to adapt the functionality here so that Prefect behaves as in the documentation so:

  • state.result[task_ref] can be used instead of state.result[task_ref[0]]
  • state.result[<func name>] works
  • flow.get_tasks[0] returns a task reference

Additionally, it would be great if function names could be passed to either state.result[<func name>] (EDIT: I see now that this was functionality did exist) or flow.get_tasks(<func name>) as an alternative to adding tags and/or names to each task function to avoid having to double name all functions.


Thanks for creating and maintaining prefect, really enjoying the library so far

docs enhancement good first issue

Most helpful comment

Thanks for all the help @lauralorenz, really appreciate it! Can't emphasise enough how much easier prefect has made my data wrangling process


No need to post another issue on this at my end, I've found a handy workaround for quick checks in iPython; instead of the ugly method described earlier:

"Probably best to stick with defining task names with decorator @task(name='<NAME>'), calling em with task_ref = flow.get_tasks(name='<NAME>') and finally task_result = state.result[task_ref]._result.value even if it is a bit long-winded."

I now do tasks = flow.get_tasks() and use state.result[task[<index of task I care about>]].result

All 4 comments

Thanks for the detailed info @rdmolony! Glad you are enjoying prefect so far :)

Responding to each of your points in detail below, but the very terse tl;dr response to your 3 proposed changes from my POV is "no", "yes", "no" :)

state.result[task_ref] can be used instead of state.result[task_ref[0]]

Given that the definition of task_ref for you here is the result of task_ref = flow.get_tasks(**kwargs) then I think your suggestion boils down to that flow.get_tasks() method should return a single Task instance, not a List[Task] like it does now (see signature at https://github.com/PrefectHQ/prefect/blob/b923e6d85770e019e19f95c40df3b4ad822d9002/src/prefect/core/flow.py#L223-L229).

We need to return a list from this since it could be called with no parameters (and thus grab all the tasks of the flow) and meanwhile since task names are not required to be unique in a flow (see below), it is not necessarily safe for us to not return a list here in the specific case of passing in a task name as a parameter.

Overall I think we prefer to have users unpack their task references if they get them from flow.get_tasks() based on what they know about how they called it, instead of changing the return type based on the number of values returned, which would be a breaking change.

In [17]: %paste                             
from prefect import Flow, task

@task(name='load data')
def _load_data() -> dict:
    return {'a': [1, 2, 3]}

@task(name='load data')
def _load_data_2() -> dict:
        return {'b': [9, 8, 7]}


with Flow('preprocess-VO') as flow:
    _load_data()
    _load_data_2()

## -- End pasted text --

In [18]: state = flow.run()                 
[2020-04-29 20:10:07] INFO - prefect.FlowRunner | Beginning Flow run for 'preprocess-VO'
[2020-04-29 20:10:07] INFO - prefect.FlowRunner | Starting flow run.
[2020-04-29 20:10:07] INFO - prefect.TaskRunner | Task 'load data': Starting task run...
[2020-04-29 20:10:07] INFO - prefect.TaskRunner | Task 'load data': finished task run for task with final state: 'Success'
[2020-04-29 20:10:07] INFO - prefect.TaskRunner | Task 'load data': Starting task run...
[2020-04-29 20:10:07] INFO - prefect.TaskRunner | Task 'load data': finished task run for task with final state: 'Success'
[2020-04-29 20:10:07] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

In [19]: task_ref = flow.get_tasks(name='load data')                                    

In [20]: task_ref                           
Out[20]: [<Task: load data>, <Task: load data>]

state.result[<func name>] works

This one I think is an issue of perceived parity between the imperative API and the functional API. This 'works' with the imperative API 'automatically' because it adds variable names matching the function names for you to your locals, where in the functional API we depend on the user to do so. In the following examples, the first and third 'work' as you expect.

1

# given the following
with Flow('example 3') as flow:
    _load_data = _load_data()

# then
In [3]: state.result[_load_data]            
Out[3]: <Success: "Task run succeeded.">

2

# given the following
with Flow('preprocess-VO') as flow:
    _load_data()

# then
In [23]: state.result[_load_data]           
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-23-83dbaff0ab77> in <module>
----> 1 state.result[_load_data]

KeyError: <Task: load data>

3

# given the following
 imperative_flow = Flow(name="imperative version", tasks=[_load_data])          

# then
In [26]: state2.result[_load_data]          
Out[26]: <Success: "Task run succeeded.">

Without knowing 100% the technical reasons behind not doing so, enabling style number 2 seems like an enhancement that could have its own ticket if that's still of interest.

flow.get_tasks[0] returns a task reference

I think the mix up here is that flow.get_tasks[0] should be flow.get_tasks()[0] (as a method call) in the docs you were referencing. So a change from:

In [3]: flow.get_tasks[0]                   
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-3-b64d918e68aa> in <module>
----> 1 flow.get_tasks[0]

TypeError: 'method' object is not subscriptable

to

In [4]: flow.get_tasks()[0]                 
Out[4]: <Task: get_type>

In [5]: state.result[flow.get_tasks()[0]]   
Out[5]: <Success: "Task run succeeded.">

For this one I think we don't want to change Prefect, but fix the typo in the docs.

Hi @lauralorenz, thanks a lot for your extremely detailed feedback!

Rationale for Lists in issue 1 (flow.get_tasks[0] returns a task reference) and just fixing the typo in issue 3 (state.result[task_ref] can be used instead of state.result[task_ref[0]] ) make perfect sense, thanks! Repeating the 'Load Data' string in task names is potentially a very powerful feature for seeing loaded input quickly.

Why my suggestion for issue 2 state.result[<func name>] (below) would be problematic

I drafted below idea before realising that it would cause problems...

I wanted to find a way to get the result of any function used in Flow by calling function names - as I often use the pipeop pipe operator >> to skip intermediate variable names. However, if a function in Flow is used more than once than this method will cause issues... Probably best to stick with defining task names with decorator @task(name='<NAME>'), calling em with task_ref = flow.get_tasks(name='<NAME>') and finally task_result = state.result[task_ref]._result.value even if it is a bit long-winded.

What do you think?


What I was thinking...

With regards to issue 2 (state.result[<func name>]) I currently use a pipe operator >> from pipeop to avoid intermediate variable names where possible so:

from pipeop import pipes
from prefect import Flow, task

@task
def _load_data() -> dict:
    return {'a': [1, 2, 3]}

etc...

@pipes
def data_etl_flow():

    # given the following
    with Flow('example 3') as flow:
        clean_data = (
            _load_data()
            >> _do_something_to_data()
            ...
            >> _data_cleaned()
        )

    return flow

# then
In [2]: state = flow.run()
In [3]: state.result[_load_data]            
Out[3]: KeyError: ...

With this type of workflow the functional API method of calling the intermediate variable names directly is obviously an issue. I like the imperative method for this reason but its maybe not as flexible as a pipe operator if the flow involves loading loading and cleaning from two datasets which which could be done in parallel:

from pipeop import pipes
from prefect import Flow, task

@task
def _load_data() -> dict:
    return {'a': [1, 2, 3]}

etc...

@pipes
def data_etl_flow():

    # given the following
    with Flow('example 3') as flow:
        clean_data = (
            _load_data()
            >> _do_something_to_data()
            ...
            >> _data_cleaned()
        )

        clean_other_data = (
            _load_other_data()
            >> _do_something_to_other_data()
            ...
            >> _other_data_cleaned()
        )

        merged_data = _merge(clean_data, clean_other_data)

    return flow

# then
In [3]: state.result[_load_data]            
Out[3]: KeyError: ...

My main aim in raising all of above issues was to find a way to get the result of any function used in Flow by calling any of the function names used in the flow _do_something_to_data, _do_something_to_other_data directly from state.result[<NAME>]

Hi @rdmolony, just closing the loop on this! We fixed the docs mistake in a commit in #2534 which auto-closed this issue. As for the ultimate request, if you still feel strongly about it I would suggest opening a new issue just about that, though I do think it is best to be explicit in the flow code and even if we pursue something more implicit there, we'll get into the trouble of duplicate task names eventually like you say if a task is used more than once.

Thanks for all the help @lauralorenz, really appreciate it! Can't emphasise enough how much easier prefect has made my data wrangling process


No need to post another issue on this at my end, I've found a handy workaround for quick checks in iPython; instead of the ugly method described earlier:

"Probably best to stick with defining task names with decorator @task(name='<NAME>'), calling em with task_ref = flow.get_tasks(name='<NAME>') and finally task_result = state.result[task_ref]._result.value even if it is a bit long-winded."

I now do tasks = flow.get_tasks() and use state.result[task[<index of task I care about>]].result

Was this page helpful?
0 / 5 - 0 ratings

Related issues

petermorrow picture petermorrow  路  3Comments

GZangl picture GZangl  路  3Comments

Trymzet picture Trymzet  路  4Comments

rej-jsa picture rej-jsa  路  4Comments

ludwigm picture ludwigm  路  3Comments