Distributed: Cluster hangs with a few tasks in "processing" state but no cpu load on any workers

Created on 9 Apr 2021  路  9Comments  路  Source: dask/distributed

This problem is stochastic. It seems to occur more frequently when there is more sharing of data between workers. map_overlap calls seem particularly problematic.

Cluster is set up using dask-jobqueue.LSFCluster and dask.distributed.Client

cluster = LSFCluster(
    cores, ncpus, memory, mem,
    walltime=walltime,
    env_extra=env_extra,
    **kwargs,
)
client = Client(cluster)
cluster.scale(job=njobs)  # number of workers

Workers are all allocated properly, bash scripts invoking LSF all seem fine. The task graph starts to execute, but then gets hung up and sits indefinitely in this type of state:

Screen Shot 2021-04-09 at 12 26 36 PM

Screen Shot 2021-04-09 at 12 27 24 PM

No workers show any cpu activity (2-4% for all workers). env_extra above makes sure all MKL, BLAS, and OpenMP environment variables are set to 2 threads per core (should be fine with hyper threading?).

When I click on the red task on the left of the graph I see:
hung_cluster_last_task_left.pdf

When I click on the red task on the right of the graph (second to last column) I see:
hung_cluster_last_task.pdf

For the red task on the right, the two "workers with data" show:

Screen Shot 2021-04-09 at 12 28 30 PM

Screen Shot 2021-04-09 at 12 28 32 PM

I've let these hang for upwards of 30 minutes with no meaningful cpu activity on any workers before killing the cluster manually. I can't let it run any longer because I'm paying for cluster time so I don't know if it's just (intractably) slow or totally hung. Comparatively the entire rest of the task graph was executed in less than 180 seconds.

Any pointers as to what could be causing this or how to permanently avoid it would be really appreciated.

  • Dask version: 2020.12.0
  • Python version: 3.8.5
  • Operating System: CentOS
  • Install method (conda, pip, source): pip

All 9 comments

Sorry for the late response. My first suggestion is that you upgrade dask and distributed. This is kind of a shot in the dark, but 2020.12.0 had some pretty big changes with the scheduler so it does seem possible that there would be issues.

I will also ping @jrbourbeau on this in case he has encountered anything similar.

I agree with @jsignell that it would be good to confirm whether or not the issue is still present when using the latest dask and distributed releases (version 2021.04.0). If you still observe the same hanging behavior, the next place I would look would be the worker and scheduler logs to see if there is any relevant information there

Thanks for the replies @jsignell and @jrbourbeau, and of course thanks for dask and dask-distributed to begin with!

I have confirmed that I still get hanging behavior with 2021.04.0

I have also checked the logs and found something. It was difficult at first since the problem was mostly occurring for jobs with hundreds of workers and there were lots of logs to comb through without really knowing what to look for.

But I ran a simpler job with only 1 worker with 4 cpu cores and 2 processes and it was really helpful:

distributed.nanny - INFO -         Start Nanny at: 'tcp://10.36.111.40:26395'
distributed.nanny - INFO -         Start Nanny at: 'tcp://10.36.111.40:28603'
distributed.worker - INFO -       Start worker at:   tcp://10.36.111.40:15535
distributed.worker - INFO -          Listening to:   tcp://10.36.111.40:15535
distributed.worker - INFO -       Start worker at:   tcp://10.36.111.40:26721
distributed.worker - INFO -          dashboard at:         10.36.111.40:39249
distributed.worker - INFO -          Listening to:   tcp://10.36.111.40:26721
distributed.worker - INFO - Waiting to connect to:   tcp://10.36.110.12:37123
distributed.worker - INFO -          dashboard at:         10.36.111.40:35299
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Waiting to connect to:   tcp://10.36.110.12:37123
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                  27.94 GiB
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -       Local Directory: /scratch/fleishmang/dask-worker-space/worker-20inj_a7
distributed.worker - INFO -                Memory:                  27.94 GiB
distributed.worker - INFO -       Local Directory: /scratch/fleishmang/dask-worker-space/worker-ju9nhbqw
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://10.36.110.12:37123
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:   tcp://10.36.110.12:37123
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
IBM Spectrum LSF 10.1.0.0 build 532214, Oct 16 2019
Copyright International Business Machines Corp. 1992, 2016.
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.

  binary type: linux3.10-glibc2.17-x86_64
IBM Spectrum LSF 10.1.0.0 build 532214, Oct 16 2019
Copyright International Business Machines Corp. 1992, 2016.
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.

  binary type: linux3.10-glibc2.17-x86_64
distributed.worker - INFO - Closing worker gracefully: tcp://10.36.111.40:26721
distributed.worker - INFO - Closing worker gracefully: tcp://10.36.111.40:15535
distributed.worker - INFO - Stopping worker at tcp://10.36.111.40:26721
distributed.worker - ERROR - failed during get data with tcp://10.36.111.40:15535 -> tcp://10.36.111.40:26721
Traceback (most recent call last):
  File "/groups/scicompsoft/home/fleishmang/bin/miniconda3/envs/cs_test/lib/python3.8/site-packages/tornado/iostream.py", line 971, in _handle_write
    num_bytes = self.write_to_fd(self._write_buffer.peek(size))
  File "/groups/scicompsoft/home/fleishmang/bin/miniconda3/envs/cs_test/lib/python3.8/site-packages/tornado/iostream.py", line 1148, in write_to_fd
    return self.socket.send(data)  # type: ignore
BrokenPipeError: [Errno 32] Broken pipe

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/groups/scicompsoft/home/fleishmang/bin/miniconda3/envs/cs_test/lib/python3.8/site-packages/distributed/worker.py", line 1366, in get_data
    response = await comm.read(deserializers=serializers)
  File "/groups/scicompsoft/home/fleishmang/bin/miniconda3/envs/cs_test/lib/python3.8/site-packages/distributed/comm/tcp.py", line 206, in read
    convert_stream_closed_error(self, e)
  File "/groups/scicompsoft/home/fleishmang/bin/miniconda3/envs/cs_test/lib/python3.8/site-packages/distributed/comm/tcp.py", line 124, in convert_stream_closed_error
    raise CommClosedError(
distributed.comm.core.CommClosedError: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe
distributed.utils_comm - INFO - Retrying get_data_from_worker after exception in attempt 0/10: in <closed TCP>: Stream is closed
distributed.nanny - INFO - Worker closed
distributed.nanny - WARNING - Restarting worker
distributed.worker - INFO -       Start worker at:   tcp://10.36.111.40:25607
distributed.worker - INFO -          Listening to:   tcp://10.36.111.40:25607
distributed.worker - INFO -          dashboard at:         10.36.111.40:35905
distributed.worker - INFO - Waiting to connect to:   tcp://10.36.110.12:37123
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                  27.94 GiB
distributed.worker - INFO -       Local Directory: /scratch/fleishmang/dask-worker-space/worker-2o34ymo8
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://10.36.110.12:37123
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

About halfway down are two lines that say Closing worker gracefully (once for each of the processes). I don't know why the processes would be restarted. The maximum memory use for each process is very comfortably below 10GB and each process has been nominally allocated with 30GB.

It seems that both workers were closed, losing intermediate results required for subsequent tasks. An attempt at a data transfer to facilitate one of those tasks then fails. Then the scheduler tries to restart the workers, which is successful - but now the cluster is hung. The dashboard still says the (lost) intermediates are either in the "released" or "memory" state and the dependent tasks are just waiting. The restarted workers show 2-4% cpu load indefinitely.

So I'm not sure why the scheduler closes the workers? And I assumed gracefully meant intermediate results would either be stored elsewhere before closing (though in this case all workers were restarted) - or scheduled to be recomputed after workers restart.

(I'm not being critical, I just want to understand how things work and I'm willing to help out with fixes if given some guidance - I've never looked at any of the distributed source; assuming it's not something in my own code of course - which is still possible).

That is really helpful extra info @GFleishman! I am going to transfer this issue to https://github.com/dask/distributed where it'll get more eyes from the hard-core distributed people. I think Closing worker gracefully just means without throwing a loud KilledWorker error, but I might be wrong about that.

We鈥檝e seen very similar problems with map_overlap (running on Kubernetes). In the end we prepended and appended data to each partition with our own logic to circumvent it...

Hi @rubenvdg - thanks for the comment. This does happen most often with map_overlap, but it also occurs for me with map_blocks on very large arrays: (40000, 500, 2048, 1024).... that's (time, z, y, x) for 4D imaging data.

Same here. In our case it also happened on map_partitions, although with map_overlap the problem seemed to happen more often. Was difficult to reproduce, so was never able to make a good issue for it.

I agree that it's difficult to reproduce. On several occasions I've convinced myself that the problem was solved only to find out on the next big run that it wasn't. I think it must have something to do with worker to worker communication of dependencies and/or task states, which can be disrupted for a number of reasons, and then potentially is not reset properly by the scheduler. That's all speculation, but the error logs and behavior so far point that way. On some lucky runs I think it's possible that the disrupting events just don't occur (e.g. might be dependent on network traffic).

@fjetter I think this may be another instance of the issues you're working on.

Was this page helpful?
0 / 5 - 0 ratings