I've heard from a contributor that an unstable mapping behavior occurs. The way I heard it was:
I also found reports of this in our slack history (archived here: https://github.com/PrefectHQ/prefect/issues/2655) that implied a link to specific deployment environments and for high volume mapped pipelines.
Note: is this possibly related to https://github.com/PrefectHQ/prefect/issues/2430?
What did you expect to happen instead?
The upstream mapped task is Failed, and the downstream mapped task does not run.
A minimal example that exhibits the behavior.
I have not observed it myself yet, but based on the slack thread it seems a high volume mapping task on an unstable network using DaskKubernetesEnvironment is the best way to reproduce.
Any additional information about your environment
Optionally run prefect diagnostics from the command line and paste the information here
I think this is possibly solved by #2646 but I'm not sure how to confirm.
Without a reproducible example, I'm not sure how to progress on this, especially since it may have been resolved by the mapping refactor. +0.5 on closing if others are ok with it, since we don't have an immediate action plan or reproducer.
I think this just happened to me: https://cloud.prefect.io/prefect/task-run/2cf36073-939b-47c5-9d51-a94089889e1b?logId=
@dylanbhughes do you have a Result configured for either your Flow or for any of your tasks?
Just a GCS ResutHandler
Here's another example: {staging_url}prefect-staging/flow-run/be28eebe-06c5-4cb9-b0b4-516722416ffd?logId=
Another: {staging_url}/prefect-staging/task-run/fe1c7aac-c7ae-48de-9c97-5fba8c1a3f95?logId=
Finally I'll say: looks unresolved by the mapping refactor. What I'm seeing is if there's a failure to communicate to cloud or the run is restarted by the zombie killer, mapped tasks in the same level can return None when resurrected
I'm digging into this - I have a theory.
Good news everyone! I have a reproducible example of this behavior. @jcrist it's for your favorite part of the codebase - results! It's specific to the following situation:
It appears that all data that was produced by the successfully mapped children prior to the zombie-death is _not_ properly rehydrated on the other end whenever the process is resurrected for a retry.
Here's the flow I used locally to test:
import prefect
from prefect import task, Flow
from datetime import timedelta
import os
import time
import sys
@task
def return_list():
prefect.context['logger'].debug(f'PID: {os.getpid()}')
return list(range(10))
@task(max_retries=2, retry_delay=timedelta(seconds=0))
def map_task(x):
if x == 5:
prefect.context['logger'].critical('Waiting: do it! do it!')
time.sleep(20)
return x
@task
def reducer(ll):
msg = '\n'.join("{i}: {v}".format(i=i, v=v) for i, v in enumerate(ll))
prefect.context['logger'].debug(msg)
with Flow("zombie") as flow:
reducer(map_task.map(return_list))
Whenever I saw the waiting log I killed both the flow runner process as well as the heartbeat process for the task. After waiting for Cloud to do its thing, I then saw:
[2020-07-10 03:26:31] 807-- DEBUG - prefect.CloudTaskRunner | Task 'reducer': Calling task.run() method...
[2020-07-10 03:26:31] 29-- DEBUG - prefect.reducer | 0: None
1: None
2: None
3: None
4: None
5: None
6: None
7: 7
8: 8
9: 9
It appears that our load_results logic doesn't quite work whenever the immediate upstream was a mapped task. I can resolve tomorrow 馃憤
Most helpful comment
Without a reproducible example, I'm not sure how to progress on this, especially since it may have been resolved by the mapping refactor. +0.5 on closing if others are ok with it, since we don't have an immediate action plan or reproducer.