Prefect: Task Retries tutorial hangs

Created on 15 Mar 2019  路  2Comments  路  Source: PrefectHQ/prefect

@task
def create_payload():
    "Performs expensive computation to create / return an URL"

    sleep(5) # for whatever reason, getting to this point takes a long time
    return 'http://www.google.com'


@task(max_retries=1, retry_delay=timedelta(minutes=10))
def ping_external_service(url):
    "Performs a simple GET request to the provided URL, and returns the text of the response."

    if prefect.context.get("fail"):
        raise ValueError("Request failed with status code 418.")
    else:
        r = requests.get(url)
        return r.text

with Flow(name="retry example") as f:
    text = ping_external_service(create_payload)

%%time
with prefect.context(fail=True) as ctx:
    flow_state = f.run(context=ctx)

According to the tutorial (https://docs.prefect.io/guide/tutorials/task-retries.html), the wall time of this Flow run should be 5.01s. The docs state:

As expected, the flow run took 5 seconds due to the create_payload task. We can now inspect both the state of the flow as well as the state of its tasks.

So it appears the docs suggest that control should return to the user with a Flow state of Pending, after which point they can examine flow_state. However, on executing this code (from the provided sample notebook task-retries.ipynb), f.run(context=ctx) takes 10+ minutes, and the notebook is left hanging while "Waiting for next available Task run at [10 minutes from now]".

The full log for this Flow run is:

[2019-03-15 17:53:19,360] INFO - prefect.FlowRunner | Beginning Flow run for 'retry example'
[2019-03-15 17:53:19,363] INFO - prefect.FlowRunner | Starting flow run.
[2019-03-15 17:53:19,365] DEBUG - prefect.FlowRunner | Flow 'retry example': Handling state change from Scheduled to Running
[2019-03-15 17:53:19,372] INFO - prefect.TaskRunner | Task 'create_payload': Starting task run...
[2019-03-15 17:53:19,375] DEBUG - prefect.TaskRunner | Task 'create_payload': Handling state change from Pending to Running
[2019-03-15 17:53:19,376] DEBUG - prefect.TaskRunner | Task 'create_payload': Calling task.run() method...
[2019-03-15 17:53:24,382] DEBUG - prefect.TaskRunner | Task 'create_payload': Handling state change from Running to Success
[2019-03-15 17:53:24,385] INFO - prefect.TaskRunner | Task 'create_payload': finished task run for task with final state: 'Success'
[2019-03-15 17:53:24,389] INFO - prefect.TaskRunner | Task 'ping_external_service': Starting task run...
[2019-03-15 17:53:24,394] DEBUG - prefect.TaskRunner | Task 'ping_external_service': Handling state change from Pending to Running
[2019-03-15 17:53:24,400] DEBUG - prefect.TaskRunner | Task 'ping_external_service': Calling task.run() method...
[2019-03-15 17:53:24,402] INFO - prefect.TaskRunner | Unexpected error: ValueError('Request failed with status code 418.',)
[2019-03-15 17:53:24,405] DEBUG - prefect.TaskRunner | Task 'ping_external_service': Handling state change from Running to Failed
[2019-03-15 17:53:24,407] DEBUG - prefect.TaskRunner | Task 'ping_external_service': Handling state change from Failed to Retrying
[2019-03-15 17:53:24,410] INFO - prefect.TaskRunner | Task 'ping_external_service': finished task run for task with final state: 'Retrying'
[2019-03-15 17:53:24,418] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2019-03-15 17:53:24,425] DEBUG - prefect.FlowRunner | Flow 'retry example': Handling state change from Running to Running
[2019-03-15 17:53:24,428] INFO - prefect.Flow | Waiting for next available Task run at 2019-03-15T18:03:24.407309+00:00
[2019-03-15 18:03:24,513] INFO - prefect.FlowRunner | Beginning Flow run for 'retry example'
[2019-03-15 18:03:24,525] INFO - prefect.TaskRunner | Task 'ping_external_service': Starting task run...
[2019-03-15 18:03:24,529] DEBUG - prefect.TaskRunner | Task 'ping_external_service': Handling state change from Retrying to Running
[2019-03-15 18:03:24,533] DEBUG - prefect.TaskRunner | Task 'ping_external_service': Calling task.run() method...
[2019-03-15 18:03:24,535] INFO - prefect.TaskRunner | Unexpected error: ValueError('Request failed with status code 418.',)
[2019-03-15 18:03:24,539] DEBUG - prefect.TaskRunner | Task 'ping_external_service': Handling state change from Running to Failed
[2019-03-15 18:03:24,542] INFO - prefect.TaskRunner | Task 'ping_external_service': finished task run for task with final state: 'Failed'
[2019-03-15 18:03:24,552] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2019-03-15 18:03:24,554] DEBUG - prefect.FlowRunner | Flow 'retry example': Handling state change from Running to Failed

After this point, examining flow_state with:

print("Flow state: {}\n".format(flow_state))
print("Flow results: {}".format(flow_state.result))

produces:

Flow state: Failed("Some reference tasks failed.")

Flow results: {<Task: ping_external_service>: Failed("Unexpected error: ValueError('Request failed with status code 418.',)"), <Task: create_payload>: Success("Task run succeeded.")}
docs

Most helpful comment

We are about to begin going through the tutorials / docs with a fine-toothed comb in preparation for a full open source release, so this is a very relevant issue for us - thanks @ntaylorwss !

flow.run is now a fully stateful / scheduled execution, so it performs retries on schedule (originally this was not the case and it would return control immediately as the tutorial claims).

For your own knowledge, to replicate the execution pattern of the tutorial, you can use a FlowRunner directly:

from prefect.engine.flow_runner import FlowRunner

flow_state = FlowRunner(flow=f).run(return_tasks=f.tasks, context=ctx)

which will only perform a single execution cycle (no retries).

You don't need to do this though, and we will update the tutorial to address this behavior change!

All 2 comments

We are about to begin going through the tutorials / docs with a fine-toothed comb in preparation for a full open source release, so this is a very relevant issue for us - thanks @ntaylorwss !

flow.run is now a fully stateful / scheduled execution, so it performs retries on schedule (originally this was not the case and it would return control immediately as the tutorial claims).

For your own knowledge, to replicate the execution pattern of the tutorial, you can use a FlowRunner directly:

from prefect.engine.flow_runner import FlowRunner

flow_state = FlowRunner(flow=f).run(return_tasks=f.tasks, context=ctx)

which will only perform a single execution cycle (no retries).

You don't need to do this though, and we will update the tutorial to address this behavior change!

What @cicdw wrote will replicate the current tutorial docs (as they used to forward flow.run() to a FlowRunner class), but if you want to use flow.run() and have it end in 5 seconds, just change

@task(max_retries=1, retry_delay=timedelta(minutes=10))
def ping_external_service(url):

to

@task
def ping_external_service(url):

and the retry instructions will no longer be respected (which is where the 10 minute delay came from!)

Sorry for the inconvenience and thanks for going through so carefully!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

gryBox picture gryBox  路  3Comments

rej-jsa picture rej-jsa  路  4Comments

orcaman picture orcaman  路  3Comments

GZangl picture GZangl  路  3Comments

jlowin picture jlowin  路  3Comments