Dask: distributed.scheduler: User asked for computation on lost data

Created on 29 Sep 2020  路  4Comments  路  Source: dask/dask

I am running streaming jobs on Dask which require the Dask workers (and scheduler) to be running 24*7.

The jobs had been running fine for ~15 days, when one of them stopped processing data (doesn't show any new tasks on the Dask diagnostic dashboard anymore, and a few tasks seem to be held onto by the workers/scheduler maybe when the failure started).

There was no error thrown by the main job script, but I see the following logs.

Scheduler logs:

distributed.scheduler - INFO - User asked for computation on lost data, window_accumulator-0454303e4b0c61495b0fad8d69ae2316
distributed.scheduler - INFO - User asked for computation on lost data, getitem-8bc404e0acbfe056579b6d83bd3d022e
distributed.scheduler - INFO - User asked for computation on lost data, getitem-e904b7155b8e74e306ae5166073d08ca
distributed.scheduler - INFO - User asked for computation on lost data, func-e8a8972816cfd9015c24d0ac4a6fd4ee
distributed.scheduler - INFO - User asked for computation on lost data, intermediate_aggs-a68480d5864d86a80b1e538f7cdb4279
distributed.scheduler - INFO - User asked for computation on lost data, global_CCU_aggs-e86eee4f7747770f1ab30b64f85afb46
distributed.scheduler - INFO - User asked for computation on lost data, lambda-d79780224b8c68d9cac725d0e87f9cb2
distributed.scheduler - INFO - User asked for computation on lost data, lambda-a5f6a80a2ea3a1f37f8730ace20c6583
distributed.scheduler - INFO - User asked for computation on lost data, write_to_kafka-f83de7f1acd5d25560c9709a5e797fbe

Worker logs:

distributed.core - INFO - Event loop was unresponsive in Worker for 10.04s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

A point worth mentioning about this specific job is that it's a rolling 8-hour window job (i.e., it involves a lot of data being kept in memory, shuffled across workers, and continuously replaced by "newer" data as it comes in).

[Note] I have already tuned quite a few other configs in ~/.config/dask/distributed.yaml according to my needs, like comm.timeouts, worker.memory (fractions at which workers should spill, pause, etc.), etc., and they all seem to have done a pretty good job so far. :)

  1. I was hoping someone could help me with some of the possible causes when one can see these kinds of scheduler and worker logs, although I know without any minimal reproducer, debugging this would be difficult.

  2. I was wondering if there could be a scheduler/worker config I can make use of to either alleviate/eliminate these issues going forward. Is there a _complete_ list (with a more detailed description) of the configs for distributed available somewhere?

  3. Some configs I am curious to know more about are the following (these might not be relevant to the problem above, but I would still appreciate it if someone could help me understand them better since I think a rolling memory-intensive job like the one described above might benefit from specific configs):

scheduler:
  default-data-size: 1kiB
  unknown-task-duration: 500ms  # Default duration for all tasks with unknown durations ("15m", "2h")
  default-task-durations:  # How long we expect function names to run ("1h", "1s") (helps for long tasks)
     rechunk-split: 1us
     shuffle-split: 1us

comm:
  retry:  # some operations (such as gathering data) are subject to re-tries with the below parameters
     count: 0  # the maximum retry attempts. 0 disables re-trying.
  offload: 100MiB # Size after which we choose to offload serialization to another thread

Any help would be much appreciated!

All 4 comments

Well I can help on 2! https://docs.dask.org/en/latest/configuration-reference.html has a list and description for all the dask and distributed config options. It is populated from https://github.com/dask/distributed/blob/master/distributed/distributed-schema.yaml if you prefer to read it in that format.

The idea with default-task-duration is you can set the expected durations for tasks be their names. So if you have a task named "read-parquet" that normally takes 1min, you can set that there. retry is how many times to retry a task if it fails or times out. You can set both the count and the delay between tries.

Thank you so much, @jsignell. The links and the descriptions help a lot.

Would you happen to know more about the scheduler.work-stealing, comm.offload configs?

I am trying to think of configs that may be reacting with/to long-running GIL-holding functions or moving large chunks of data across workers eventually causing the User asked for computation on lost data.

I'm also thinking if the admin.pdb-on-err or worker.profile.low-level might help in debugging things happening in the scheduler since the User asked for computation on lost data is seen in the scheduler logs.

I don't have any particular insight, unfortunately, but if you haven't already read them, you might find https://distributed.dask.org/en/latest/work-stealing.html and https://distributed.dask.org/en/latest/worker.html#memory-management helpful.

Was this page helpful?
0 / 5 - 0 ratings