Let's say we have a task like this (pseudo code below):
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from prefect.environments import LocalEnvironment
@task(max_retries=3, retry_delay=timedelta(minutes=5))
def get_url_page(url_page: str):
response = requests.get(url_page)
response.raise_for_status()
return response
If we execute this Flow like so :
with Flow('Example') as flow:
all_url_pages = ['link_to_page1', 'link_to_page2', 'link_to_page3', ....]
url_page_results = get_url_page.map(all_url_pages)
flow.environment = LocalEnvironment(
labels=[], executor=LocalDaskExecutor(scheduler="threads", num_workers=num_workers),
)
Then if one of the requests fails and the task waits for 5 min to be retried, none of the other URLs mapped to this task are executed. At the same time, the worker is idle.
Ideally I believe when a single job in the mapped task fails, while waiting for the retry, the worker should move to the next job.
This makes a lot of sense! I'm not sure off the top of my head why it's behaving that way but I'll try to dig into it soon.
I am unable to reproduce this. Do you have a working code?
I am unable to reproduce this. Do you have a working code?
I can send a minimum viable example later today.
I am unable to reproduce this. Do you have a working code?
I can send a minimum viable example later today.
I also failed to reproduce this with the given code (added import requests; from datetime import timedelta; num_workers=1)
This has to do with how retrying is currently implemented in prefect. If a task retries and the retry time is < 10 min (hardcoded), the retry wait will be handled in the TaskRunner, which will take a slot in the running executor (e.g. taking a thread in a LocalDaskExecutor). Fixing this would require some larger changes to handle retries at the FlowRunner level, so other work could be done while the task is waiting to retry. I agree that this would be useful to do, but it's not a quick thing to fix.
Some context: if we don't wait within the Task Runner, then the retry delay isn't really being respected (as the entire graph will need to be visited before returning to the retrying task, which generally takes time).
When running against a Prefect backend, if we don't retry within the runner, then that will result in a new Agent job submission at the retry time which gets _really_ inefficient as the number of retries grows.
Some context: if we don't wait within the Task Runner, then the retry delay isn't really being respected (as the entire graph will need to be visited before returning to the retrying task, which generally takes time).
Right. The retry delay is more of a minimum bound though (we can't guarantee that the task will retry exactly after that delay, that's up to the OS/runtime on how threads are scheduled). Handling it at the FlowRunner level would be possible with some changes to the graph traversal, but quick retries would definitely be slowed down as there'd be lots more overhead. The most efficient way to handle these would be some new thing that runs in every prefect worker process and handles worker-local scheduling - but right now that's all abstracted away in the Executor backends, so we don't really have that option (and adding it would be nontrivial).
We could do a quick-fix this for DaskExecutor based execution alone (not LocalDaskExecutor) by having a retrying task secede from the threadpool while waiting, unblocking other threads. I'm hesitant to do this though, as:
DaskExecutor specific behavior inside the TaskRunner, neither one sounds particularly pleasant.I don't see any benefit in changing the current behavior, especially if all solutions involve increasing complexity.
I definitely don't see a benefit of the more complicated solution. The DaskExecutor only fix would rely on dask-specific behavior to handle this, but would probably be a fairly small diff in the prefect codebase. Still don't think it's worth it at this point, was just trying to get my thoughts out on the issue.
Do you still need code to reproduce?
I still think there is a lot of added value. Sometime when requesting multiple (say hundreds or thousands or more) URL, if one fails (e.g. the endpoint in the URL is not yet available and will be in some minutes or hours later), it makes to keep requesting the rest of the URLs in the meantime; or at least have the option to specify such behaviour (this will actually be ideal) and keep current behaviour as default for example.
@snenkov It may be useful if you provided the code for the future, but per the discussion above we're not going to be able to address this right now--the implications are too complex. I'll leave this issue open and hopefully we can address it down the road.
We also run flows with mapped tasks that frequently comprise thousands of task runs. Therefore, this creates significant delays and costs on our side as well.
I don't have a deep understanding of prefect yet, but here is a possible conceptual solution from my perspective:
Couldn't an easy solution be to just queue the failed task run within the map to the end of the map?
Like such, the retries would be managed within a map (not at the flow level?) and therefore be executed relatively soon, without delaying other tasks in the map...
Most helpful comment
This makes a lot of sense! I'm not sure off the top of my head why it's behaving that way but I'll try to dig into it soon.