Prefect: Question: Retry on mapped task

Created on 31 Aug 2019  路  2Comments  路  Source: PrefectHQ/prefect

Description

A clear description of the bug

Just experimenting and trying to figure out if I am doing something incorrectly or if it isn't possible. In the case where I have mapped task that has a retry attached to it. If any instance of that mapped task fails, it causes all downstream tasks to fail.

Expected Behavior

What did you expect to happen instead?

When an instance of the mapped task fails, it will retry until all instances are successful, or, if max_retries is met and still not successful, to fail the rest of the Flow.

Reproduction

A minimal example that exhibits the behavior.

Code:

from datetime import timedelta
from random import random

from prefect import Flow, context, task


@task(max_retries=3, retry_delay=timedelta(seconds=2))
def add_ten(data: float):
    if random() < 0.33:
        raise ValueError("Random error")
    return data + 10


@task
def sum_data(d):
    log = context.get("logger")
    log.info(f"Sum of data {sum(d) / len(d)}")


with Flow("Mapped Tasks Should Retry on Fail") as flow:
    d = add_ten.map(data=[1, 2, 3])
    sum_data(d)

flow.run()

Output:

[2019-08-30 22:54:31,086] INFO - prefect.FlowRunner | Beginning Flow run for 'Mapped Tasks Should Retry on Fail'
[2019-08-30 22:54:31,086] INFO - prefect.FlowRunner | Starting flow run.
[2019-08-30 22:54:31,091] INFO - prefect.TaskRunner | Task '1': Starting task run...
[2019-08-30 22:54:31,091] INFO - prefect.TaskRunner | Task '1': finished task run for task with final state: 'Success'
[2019-08-30 22:54:31,092] INFO - prefect.TaskRunner | Task '3': Starting task run...
[2019-08-30 22:54:31,092] INFO - prefect.TaskRunner | Task '3': finished task run for task with final state: 'Success'
[2019-08-30 22:54:31,092] INFO - prefect.TaskRunner | Task '2': Starting task run...
[2019-08-30 22:54:31,093] INFO - prefect.TaskRunner | Task '2': finished task run for task with final state: 'Success'
[2019-08-30 22:54:31,093] INFO - prefect.TaskRunner | Task 'List': Starting task run...
[2019-08-30 22:54:31,093] INFO - prefect.TaskRunner | Task 'List': finished task run for task with final state: 'Success'
[2019-08-30 22:54:31,094] INFO - prefect.TaskRunner | Task 'add_ten': Starting task run...
[2019-08-30 22:54:31,095] INFO - prefect.TaskRunner | Task 'add_ten[0]': Starting task run...
[2019-08-30 22:54:31,095] INFO - prefect.TaskRunner | Task 'add_ten[0]': finished task run for task with final state: 'Success'
[2019-08-30 22:54:31,095] INFO - prefect.TaskRunner | Task 'add_ten[2]': Starting task run...
[2019-08-30 22:54:31,096] INFO - prefect.TaskRunner | Task 'add_ten[2]': finished task run for task with final state: 'Success'
[2019-08-30 22:54:31,096] INFO - prefect.TaskRunner | Task 'add_ten[1]': Starting task run...
[2019-08-30 22:54:31,096] INFO - prefect.TaskRunner | Unexpected error: ValueError('Random error')
[2019-08-30 22:54:31,096] INFO - prefect.TaskRunner | Task 'add_ten[1]': finished task run for task with final state: 'Retrying'
[2019-08-30 22:54:31,097] INFO - prefect.TaskRunner | Task 'add_ten': finished task run for task with final state: 'Mapped'
[2019-08-30 22:54:31,097] INFO - prefect.TaskRunner | Task 'sum_data': Starting task run...
[2019-08-30 22:54:31,097] INFO - prefect.TaskRunner | Task 'sum_data': finished task run for task with final state: 'TriggerFailed'
[2019-08-30 22:54:31,098] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2019-08-30 22:54:31,098] INFO - prefect.Flow | Waiting for next available Task run at 2019-08-30T22:54:33.096859+00:00

To expand on what I believe the ideal behavior would be: In the above output we see that the add_ten task instance for index: 1 failed due to Random Error and caused the downstream tasks to also fail. But, the definition for the add_ten task has three retries allocated. While the add_ten task instances for indices: 0 and 2 succeeded on the first try, the add_ten task should be retried prior to continuing to the downstream sum_data task.

Environment

Any additional information about your environment

OS: macOS 10.14.6
Env Manager: conda 4.6.14
Python: 3.7.4

prefect installed with pip
prefect: 0.6.1

bug

Most helpful comment

Hi @JacksonMaxfield - you are absolutely correct, this is a bug!! Fix incoming. And thank you for the clear reproducible example.

All 2 comments

Hi @JacksonMaxfield - you are absolutely correct, this is a bug!! Fix incoming. And thank you for the clear reproducible example.

Thanks for the incredibly quick response and fix!

Was this page helpful?
0 / 5 - 0 ratings