While debugging an unrelated issue I've found some strange behaviour when a worker connects to a scheduler but the scheduler is not able to route back to the worker's address.
I expect this may happen to users trying to run in containers, on the cloud or generally in unusual network conditions.
We can reproduce this using Docker for Desktop. By default it is not possible for a host machine (my MacBook) to route to containers inside the container network. However it is possible for containers to route to IP addresses on the LAN, so they can connect to services on my laptop.
$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Local Directory: /var/folders/0l/fmwbqvqn1tq96xf20rlz6xmm0000gp/T/scheduler-4hoeyo6f
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://192.168.0.27:8786
distributed.scheduler - INFO - dashboard at: :8787
Take a note of my LAN IP (192.168.0.27 in this example).
Start the worker in a docker container connecting to my LAN scheduler address.
$ docker run --rm -it --name worker daskdev/dask:2.9.1 dask-worker tcp://192.168.0.27:8786
Unable to find image 'daskdev/dask:2.9.1' locally
2.9.1: Pulling from daskdev/dask
b8f262c62ec6: Already exists
0a43c0154f16: Already exists
906d7b5da8fb: Already exists
03a506d38579: Pull complete
154ee2d747b3: Pull complete
d4c2e8bc6ff3: Pull complete
Digest: sha256:1e9e5c093497b65445d978737da96893005a7f24a493a9d4df382b8cf6351c15
Status: Downloaded newer image for daskdev/dask:2.9.1
+ '[' '' ']'
+ '[' -e /opt/app/environment.yml ']'
+ echo 'no environment.yml'
no environment.yml
+ '[' '' ']'
+ '[' '' ']'
+ exec dask-worker tcp://192.168.0.27:8786
distributed.nanny - INFO - Start Nanny at: 'tcp://172.17.0.2:35939'
distributed.dashboard.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: pip install jupyter-server-proxy
distributed.worker - INFO - Start worker at: tcp://172.17.0.2:43861
distributed.worker - INFO - Listening to: tcp://172.17.0.2:43861
distributed.worker - INFO - dashboard at: 172.17.0.2:43361
distributed.worker - INFO - Waiting to connect to: tcp://192.168.0.27:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 6
distributed.worker - INFO - Memory: 2.10 GB
distributed.worker - INFO - Local Directory: /worker-n5_mqtoc
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://192.168.0.27:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
Note that the worker has started listening on it's container IP tcp://172.17.0.2:43861 which is not routable from the scheduler.
If I look at the dashboard I can see my worker.

In an IPython session on my laptop I run the following code. This code will hang for a while and then begin showing errors.
_This works and returns 11 if I start the worker on my laptop rather than in the container._
from distributed import Client
client = Client("tcp://192.168.0.27:8786")
client.submit(lambda x: x + 1, 10).result()
The Client gives the following error.
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'lambda-ad6d4ec86bc36fbb11841f10ac714071': ('tcp://172.17.0.2:43861',)}
The scheduler has the following same error.
distributed.scheduler - ERROR - Couldn't gather keys {'lambda-ad6d4ec86bc36fbb11841f10ac714071': ['tcp://172.17.0.2:43861']} state: ['memory'] workers: ['tcp://172.17.0.2:43861']
NoneType: None
It also has many iterations of:
distributed.scheduler - INFO - Register worker <Worker 'tcp://172.17.0.2:43861', name: tcp://172.17.0.2:43861, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://172.17.0.2:43861
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove worker <Worker 'tcp://172.17.0.2:43861', name: tcp://172.17.0.2:43861, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://172.17.0.2:43861
The worker shows broken connection errors and then eventually closes.
distributed.worker - INFO - Connection to scheduler broken. Reconnecting...
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://192.168.0.27:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Connection to scheduler broken. Reconnecting...
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://192.168.0.27:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://192.168.0.27:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://192.168.0.276:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://192.168.0.27:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Connection to scheduler broken. Reconnecting...
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://192.168.0.27:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://192.168.0.27:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Connection to scheduler broken. Reconnecting...
distributed.worker - INFO - Connection to scheduler broken. Reconnecting...
distributed.worker - INFO - Stopping worker at tcp://172.17.0.2:43861
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://192.168.0.27:8786
distributed.worker - INFO - -------------------------------------------------
distributed.nanny - INFO - Worker closed
distributed.core - INFO - Starting established connection
distributed.nanny - INFO - Closing Nanny at 'tcp://172.17.0.2:35939'
distributed.dask_worker - INFO - End worker
If a worker connects to a scheduler but it is not possible to create the connection back the other way the worker should fail to connect.
Yeah, I can see how that could happen. For background, the handshake looks like this:
So because they both have to open an connection to the other we can get in this weird situation where one thinks that things are probably ok. It should, as you suggest, probably remove the worker after the initial connection fails.
@mrocklin @jacobtomlinson I see the same kind of errors on dask yarn when I run a long running dask dataframe.apply. Any idea how to find why the worker hangs in a bad way?
No. unfortunately not. This issue arises independently of yarn or dask dataframe. If you have another issue I encourage you to raise a new issue, ideally with a minimal reproducible example if you're able to create one.
@mrocklin Thank you for the quick response! I'll try to create a minimal reproducible example.
Is there any way of adding some logs in distributed to find out which worker is hanging and why?
Workers emit logs, so presumably you could look at them. Dask doesn't
manage moving logs around though, typically one uses whatever system one
launched Dask with for that.
On Tue, Jun 16, 2020 at 12:02 PM Kumar Bharath Prabhu <
[email protected]> wrote:
@mrocklin https://github.com/mrocklin Thank you for the quick response!
I'll try to create a minimal reproducible example.
Is there any way of adding some logs in distributed to find out which
worker is hanging and why?—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/3377#issuecomment-644954336,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTFM5FMKLX7BK2GFCADRW66VNANCNFSM4KHTAVEQ
.
@mrocklin I found the worker logs to be pretty limited in information. This is what I see for each worker
LogType:dask.worker.log
Log Upload Time:Tue Jun 16 20:14:09 +0000 2020
LogLength:1299
Log Contents:
distributed.nanny - INFO - Start Nanny at: 'tcp://10.210.128.71:46333'
distributed.worker - INFO - Start worker at: tcp://10.210.128.71:37887
distributed.worker - INFO - Listening to: tcp://10.210.128.71:37887
distributed.worker - INFO - dashboard at: 10.210.128.71:45221
distributed.worker - INFO - Waiting to connect to: tcp://10.210.128.89:38787
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 4
distributed.worker - INFO - Memory: 51.54 GB
distributed.worker - INFO - Local Directory: /mnt/yarn/usercache/hadoop/appcache/application_1592333223687_0004/container_1592333223687_0004_01_000030/dask-worker-space/worker-j6x4ihzx
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://10.210.128.89:38787
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Connection to scheduler broken. Reconnecting...
distributed.worker - INFO - Stopping worker at tcp://10.210.128.71:37887
distributed.nanny - INFO - Worker closed
Master log looks like this
20/06/16 20:08:43 INFO skein.ApplicationMaster: RUNNING: dask.worker_29 on container_1592333223687_0004_01_000031
20/06/16 20:14:08 INFO skein.ApplicationMaster: Shutting down: Exception in submitted dask application, see logs for more details
20/06/16 20:14:08 INFO skein.ApplicationMaster: Unregistering application with status FAILED
20/06/16 20:14:08 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
20/06/16 20:14:08 INFO skein.ApplicationMaster: Deleted application directory hdfs://ip-10-210-128-89.datascience.internal:8020/user/hadoop/.skein/application_1592333223687_0004
20/06/16 20:14:08 INFO skein.ApplicationMaster: WebUI server shut down
20/06/16 20:14:08 INFO skein.ApplicationMaster: gRPC server shut down
I agree, that's not very informative. It looks like either the scheduler
went down, or some network connection was severed by hardware. But at
least we know that the worker itself wasn't under any sort of stress though.
On Tue, Jun 16, 2020 at 1:18 PM Kumar Bharath Prabhu <
[email protected]> wrote:
@mrocklin https://github.com/mrocklin I found the worker logs to be
pretty limited in information. This is what I seeLog Upload Time:Tue Jun 16 20:14:09 +0000 2020
LogLength:1299
Log Contents:
distributed.nanny - INFO - Start Nanny at: 'tcp://10.210.128.71:46333'
distributed.worker - INFO - Start worker at: tcp://10.210.128.71:37887
distributed.worker - INFO - Listening to: tcp://10.210.128.71:37887
distributed.worker - INFO - dashboard at: 10.210.128.71:45221
distributed.worker - INFO - Waiting to connect to: tcp://10.210.128.89:38787
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 4
distributed.worker - INFO - Memory: 51.54 GB
distributed.worker - INFO - Local Directory: /mnt/yarn/usercache/hadoop/appcache/application_1592333223687_0004/container_1592333223687_0004_01_000030/dask-worker-space/worker-j6x4ihzx
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://10.210.128.89:38787
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Connection to scheduler broken. Reconnecting...
distributed.worker - INFO - Stopping worker at tcp://10.210.128.71:37887
distributed.nanny - INFO - Worker closed```—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/3377#issuecomment-644990141,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTCRJDQPM3GSEEFX3RTRW7HSZANCNFSM4KHTAVEQ
.
@mrocklin Got it, makes sense. I'm still trying to get a minimum reproducible example. In the master logs, when it says see logs for more details, which logs is it referring to?
I'm not sure what you mean by master logs. There are no logs other than
what you've found though. You can increase verbosity to DEBUG level if you
want, but I doubt that you'll get much more information than what you have
today.
On Tue, Jun 16, 2020 at 2:01 PM Kumar Bharath Prabhu <
[email protected]> wrote:
@mrocklin https://github.com/mrocklin Got it, makes sense. I'm still
trying to get a minimum reproducible example. In the master logs, when it
says see logs for more details, which logs is it referring to?—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/3377#issuecomment-645009664,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTCMZRTJYO7FRRTQQDDRW7MUFANCNFSM4KHTAVEQ
.
@mrocklin By master logs, I meant scheduler logs. In yarn logs there's a log for each worker and one for the scheduler. Scheduler log has LogType:application.master.log and worker logs have LogType:dask.worker.log. I was asking if there are any other logs because in the master/scheduler log it said see log for more details
I see. Will try with DEBUG level logs. Do these logs eliminate the possibility that it is a code error in the function that is passed to Dataframe.apply or is that still possible?
Most helpful comment
Yeah, I can see how that could happen. For background, the handshake looks like this:
So because they both have to open an connection to the other we can get in this weird situation where one thinks that things are probably ok. It should, as you suggest, probably remove the worker after the initial connection fails.