Prefect: can't reference a upstream_task by Task

Created on 4 Sep 2020  Â·  4Comments  Â·  Source: PrefectHQ/prefect

Description


I am trying to reference an upstream_task by passing the Task object like is done in the example here.

In my example below task_A doesn't return anything, so in the flow I do not want to assign it to a variable.

from prefect import task, Flow

@task
def task_A(dct):
    dct['a'] = 1


@task
def task_B(dct):
    return dct['a']

with Flow("functional-example") as flow:
    dct = {}
    task_A(dct)
    result = task_B(dct, upstream_tasks=[task_A])

flow.run()

gives

[2020-09-04 13:27:51] INFO - prefect.FlowRunner | Beginning Flow run for 'functional-example'
[2020-09-04 13:27:51] INFO - prefect.TaskRunner | Task 'task_A': Starting task run...
[2020-09-04 13:27:51] INFO - prefect.TaskRunner | Task 'task_A': finished task run for task with final state: 'Success'
[2020-09-04 13:27:51] INFO - prefect.TaskRunner | Task 'task_A': Starting task run...
[2020-09-04 13:27:51] ERROR - prefect.TaskRunner | Unexpected error: TypeError("task_A() missing 1 required positional argument: 'dct'")
Traceback (most recent call last):
  File "/gscratch/home/a-banijh/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/gscratch/home/a-banijh/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 822, in get_task_run_state
    value = timeout_handler(
  File "/gscratch/home/a-banijh/miniconda3/envs/majoanalysis/lib/python3.8/site-packages/prefect/utilities/executors.py", line 188, in timeout_handler
    return fn(*args, **kwargs)
TypeError: task_A() missing 1 required positional argument: 'dct'
[2020-09-04 13:27:51] INFO - prefect.TaskRunner | Task 'task_A': finished task run for task with final state: 'Failed'
[2020-09-04 13:27:51] INFO - prefect.TaskRunner | Task 'task_B': Starting task run...
[2020-09-04 13:27:51] INFO - prefect.TaskRunner | Task 'task_B': finished task run for task with final state: 'TriggerFailed'
[2020-09-04 13:27:51] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
<Failed: "Some reference tasks failed.">

Expected Behavior


The example works.

Reproduction

Environment

(majoanalysis) QUANTUM-NFS-SERVER-001  ➜  ~  prefect diagnostics
{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "Linux-4.15.0-1066-azure-x86_64-with-glibc2.10",
    "prefect_version": "0.13.5",
    "python_version": "3.8.1"
  }
}

All 4 comments

Hi @basnijholt - this is a common confusion; when you call task_A, a copy is made. Therefore task_A(dct) and task_A are actually two distinct tasks (which are copies of each other - see this doc for more details). You should instead keep the reference to your called task_A(dct) object (regardless of whether it returns anything):

with Flow("functional-example") as flow:
    dct = {}
    out_A = task_A(dct)
    result = task_B(dct, upstream_tasks=[out_A])

Also, just in case this toy example is a pattern you intend to implement, you should not rely on side effects between your tasks as you do here. A flow like this will not always run as expected, and cannot be restarted from failure.

@cicdw, thanks for explaining!

What do you precisely mean with

A flow like this will not always run as expected, and cannot be restarted from failure.

Why would it not run as expected? And how else should one do it? Avoid in-place operations altogether?

If, for example, you ever need parallelism in your Flow your tasks might run in entirely different processes (or on different machines even!). In that case each task is referencing a different dictionary. Prefect encourages you to always write code that could be distributed or parallelized.

Also, if you ever run this against a Prefect backend and want to recover from failure, there is no way to reconstruct the dictionary (because Prefect doesn't even know it exists, it's not a tracked data dependency).

This situation is precisely what dataflow is intended for:

from prefect import task, Flow

@task
def task_A(dct):
    dct['a'] = 1
    return dct


@task
def task_B(dct):
    return dct['a']

with Flow("functional-example") as flow:
    dct = {}
    a = task_A(dct)
    result = task_B(a)

flow.run()

That's clear, thanks a lot!

Was this page helpful?
0 / 5 - 0 ratings