distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

Created on 9 Oct 2019  Â·  15Comments  Â·  Source: dask/distributed

I have 2 machines: a worker machine and a scheduler machine.

The worker machine is centos 7 with python3.7 and dask-distributed 2.5.2.

The scheduler machine has a docker container running. The docker container has the same version of python and dask, and incidentally, it is also a centos 7 image.

I start the scheduler docker container with this docker-compose yaml:

    version: '3.7'
    services:
      service1:
        image: ...
        container_name: ...
        network_mode: bridge
        env_file:
          - ~/.env
        ports:
          - "8888:8888"
          - "9796:9796"
          - "9797:9797"
        volumes:
          ...
        command: jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --allow-root

(Notice I'm mapping the two ports needed for a scheduler to run.)

When I start up the a dask-worker on the worker box and a dask-scheduler in the docker container. everything seems to initiate correctly except after a little bit I get this error:

[root@510b0c5af190 web]# my_project.py run distributed_workflow
Traceback (most recent call last):
  File "/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 237, in write
    stream.write(b)
  File "/conda/lib/python3.7/site-packages/tornado/iostream.py", line 546, in write
    self._check_closed()
  File "/conda/lib/python3.7/site-packages/tornado/iostream.py", line 1009, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/conda/bin/sql_server", line 11, in <module>
    load_entry_point('sql-server', 'console_scripts', 'sql_server')()
  File "/conda/lib/python3.7/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/conda/lib/python3.7/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/conda/lib/python3.7/site-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/conda/lib/python3.7/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/conda/lib/python3.7/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/app/sql_server/cli/sql_server.py", line 28, in run
    daily(True)
  File "/app/sql_server/cli/run/daily.py", line 166, in daily
    verbose=True,
  File "/wcf/spartans/spartans/spartans.py", line 116, in __enter__
    self.open()
  File "/wcf/spartans/spartans/spartans.py", line 123, in open
    self._start_agents()
  File "/wcf/spartans/spartans/spartans.py", line 179, in _start_agents
    set_as_default=False)
  File "/conda/lib/python3.7/site-packages/distributed/client.py", line 715, in __init__
    self.start(timeout=timeout)
  File "/conda/lib/python3.7/site-packages/distributed/client.py", line 880, in start
    sync(self.loop, self._start, **kwargs)
  File "/conda/lib/python3.7/site-packages/distributed/utils.py", line 333, in sync
    raise exc.with_traceback(tb)
  File "/conda/lib/python3.7/site-packages/distributed/utils.py", line 317, in f
    result[0] = yield future
  File "/conda/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/conda/lib/python3.7/site-packages/distributed/client.py", line 973, in _start
    await self._ensure_connected(timeout=timeout)
  File "/conda/lib/python3.7/site-packages/distributed/client.py", line 1040, in _ensure_connected
    {"op": "register-client", "client": self.id, "reply": False}
  File "/conda/lib/python3.7/site-packages/tornado/gen.py", line 748, in run
    yielded = self.gen.send(value)
  File "/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 254, in write
    convert_stream_closed_error(self, e)
  File "/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 132, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

So I investigate the logs. The worker log looks like this:

(base) [worker@worker-03 tmp]$ cat worker_output.txt
distributed.nanny - INFO -         Start Nanny at: 'tcp://10.1.25.3:43111'
distributed.diskutils - INFO - Found stale lock file and directory '/home/worker/worker-h8jhtnng', purging
distributed.worker - INFO -       Start worker at:      tcp://10.1.25.3:42739
distributed.worker - INFO -          Listening to:      tcp://10.1.25.3:42739
distributed.worker - INFO -          dashboard at:            10.1.25.3:40970
distributed.worker - INFO - Waiting to connect to:   tcp://scheduler.myco.com:9796
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                   33.53 GB
distributed.worker - INFO -       Local Directory: /home/worker/worker-mf3marrd
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://scheduler.myco.com:9796
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

and my scheduler log (inside my docker container, on the scheduler.myco.com machine) looks like this:

[root@510b0c5af190 web]# cat /tmp/worker_output.txt 
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-lq7oa5sc
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://172.17.0.2:9796
distributed.scheduler - INFO -   dashboard at:                     :9797
distributed.scheduler - INFO - Register tcp://10.1.25.3:42739
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.25.3:42739
distributed.core - INFO - Starting established connection

Now, there are no errors in the logs. Indeed, I inspect the running processes and I see this:

Worker machine

worker  8974  1.6  0.1 394088 45044 ?        Sl   16:04   0:14 /data/anaconda3/bin/python /data/anaconda3/bin/dask-worker scheduler.myco.com:9796 --dashboard-prefix my_workflow

Scheduler container:

root        33  1.4  1.4 670884 115056 pts/0   Sl   16:04   0:15 /conda/bin/python /conda/bin/dask-scheduler --port 9796 --dashboard-address 172.17.0.2:9797 --dashboard-prefix my_workflow

Notice the 172.17.0.2 address is the address inside the scheduler container. If I try to initiate the dask-address as the hostname of the host machine instead I get this error [Errno 99] Cannot assign requested address presumably because the port 9797 is already taken by the docker container.

Anyway. These processes are still running, yet to my knowledge, they're not working on the workflow I tried to pass to the worker. Can you please help me understand what I'm doing wrong to produce the error above?

Most helpful comment

I too am facing this issue and it won't go away with dask/distributed 2.20

All 15 comments

What's in distributed_workflow? When does the error occur, when something is happening, or when the cluster is idle, or at the end of the script?

it seems to error on or during the

def distributed_workflow():
    ...
    # workflow is a dictionary with this structure {'task_name': (function(), arguments)}
    finished = client.get(workflow, 'workflow_end_task')

It seems to send a partial workflow or something then errors. I know this because as it begins running I look at the dask dashboard -> graph and see only part of the workflow then it errors.

Its as if the communication fails during the distribution of the workflow to the workers. but as you can see above, the processes seem like they are connected. Anyway, it happens during that client.get call.

Is there something I don't understand about working with Docker? I've been able to get this process to work fine on the bare machines. But when I introduce docker this error happens.

I'm not too familiar with Docker's networking, so I don't know.

If I were you, I would try gradually simplifying your script to make it easier to see where things break.

I'm hitting the same problem running Dask outside a container. I do not think it's related to Docker or the container configuration.

@alobbs thanks for the info, that's helpful. Are you able to share a reproducible example?

@TomAugspurger, I'm afraid I do not recall what I was doing, although I can tell you it's been a one time issue. My gut feeling is that it was due to the fact I had clients running that had been waiting for workers to get available and I was meddling with them (shutting dask-worker down and relaunching in different servers).

I'm having a similar issues outside containers:

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x1096bbd10>>, <Task finished coro=<Worker.heartbeat() done, defined at /Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/worker.py:881> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/comm/tcp.py", line 188, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/worker.py", line 918, in heartbeat
    raise e
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/worker.py", line 891, in heartbeat
    metrics=await self.get_metrics(),
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/utils_comm.py", line 391, in retry_operation
    operation=operation,
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/utils_comm.py", line 379, in retry
    return await coro()
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/core.py", line 757, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/core.py", line 540, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/comm/tcp.py", line 208, in read
    convert_stream_closed_error(self, e)
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/comm/tcp.py", line 123, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

i'm running a test set with pytest and it happens "randomly". Was not able to reproduce running a single test. And my tests are actually reported as Passed...

OSX, py3.7, dask 2.10.1, distributed 2.10.0

Another vote for addressing this issue. As others have said, it's inconsistent and difficult to reproduce, but it's definitely there. We are currently messing with setting the number of workers and memory allocation for each worker as that has fixed similar issues in the past.

dask==2.9.2
distributed==2.6.0

I've done a bit of experimentation and it looks like it is fixed by setting memory limit per worker and the number of workers to something other than their default values. I have absolutely no idea why this fixes it.

I'd encourage those running into this issue to

1) Try upgrading to the latest release of distributed to see if the problem still persists. There were recent updates to how CommClosedErrors are handled when raised while workers are heartbeating. It's not clear that this will totally resolve the issue, but the traceback in https://github.com/dask/distributed/issues/3129#issuecomment-589235528 indicates that a CommClosedError is raised during a worker heartbeat.

2) If possible, provide a minimal reproducible example. This drastically increases the chance a maintainer will be able to debug the issue.

Hi,

I ran into the same issue with a graph I created using Dask Futures.
It turns out that it doesn't affect the result in my case and I highly suspect that in my case it's when a task is writing a file (.csv in my case) and it takes longer than a predefined timeout (I checked in the dask params and you indeed have comm.timeouts.connect set to 10s but not convince it's this one as I tried to set it to 30s and it continued crashing). Anyway my guess is that during a long task the worker can't communicate then the scheduler see it as dead breaks the connection.
I found a workaround by changing the number of retries in comm.retry.count - called in distributed.utils_comm - to 2.
No more connection issues. Again I am sure it depends on your code / how long it runs / a task runs

Hope it helps.

python 3.6.9
dask 2.9.1
distributed 2.9.1
tornado 6.0.3

PS: maybe that has been fixed by a more recent version of distributed, but my company still runs w 2.9.1

Tasks can take a long time (several hours is common) that is unrelated to the timeout (unless your task holds the GIL, which is uncommon unless you're wrapping custom C code). Regardless, upgrading would be good.

I too am facing this issue and it won't go away with dask/distributed 2.20

This error can happen for many reasons. Are you able to provide a minimal
example?

On Tue, Jul 7, 2020 at 7:44 AM Navjot Kukreja notifications@github.com
wrote:

I too am facing this issue and it won't go away with dask/distributed 2.20

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/3129#issuecomment-654830905,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AAKAOIUWEP44CLNJNLVPP43R2MKCZANCNFSM4I6X24QQ
.

I created a reproducible example. It isn't minimal but fairly compact. Based on the stacktrace the error seems to have a different code path than what previous people have reported.

The first run is usually successful but executing it a second time immediately after it leads to the error. I have no dask config files.

I cannot reproduce it with pure=False and I can also not reproduce it when setting n_workers=2 with no adapt().

import random
import string
from pathlib import Path

import dask
from dask.distributed import Client, LocalCluster


def dummy(*args, **kwargs):
    import time

    time.sleep(10)
    return True


def test():
    delayed_function = dask.delayed(dummy, pure=True)
    targets = [
        delayed_function(delayed_function),
        delayed_function(delayed_function),
        delayed_function(delayed_function, delayed_function),
    ]

    random_suffix = "".join(random.choices(string.ascii_letters + string.digits, k=10))
    with LocalCluster(
        local_directory=Path(".") / f"dask_{random_suffix}",
        threads_per_worker=1,
        processes=True,
        n_workers=1,
    ) as cluster:
        cluster.adapt(minimum=1, maximum=2)
        with Client(cluster) as client:
            print(dask.compute(*targets, scheduler=client))


if __name__ == "__main__":
    test()

Typical output

$ python test_tcp.py
(True, True, True)
Task was destroyed but it is pending!
task: <Task pending name='Task-152' coro=<AdaptiveCore.adapt() done, defined at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/deploy/adaptive_core.py:179> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7c6f4a75b0>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/tornado/ioloop.py:688]>


$ python test_tcp.py
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f475e2d68e0>>, <Task finished name='Task-65' coro=<Worker.close() done, defined at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/worker.py:1169> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 187, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/worker.py", line 1193, in close
    await r.close_gracefully()
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/core.py", line 858, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/core.py", line 641, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read
    convert_stream_closed_error(self, e)
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc)) from exc
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
(True, True, True)
Task was destroyed but it is pending!
task: <Task pending name='Task-152' coro=<AdaptiveCore.adapt() done, defined at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/deploy/adaptive_core.py:179> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0c8548f070>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/tornado/ioloop.py:688]>

edit:
python 3.8.6
dask 2020.12.0
distributed 2021.01.1
tornado 6.1

I assume that my example is also reproducible without adapt(). I'm not using adapt() in my real code anymore and I'm occasionally still getting the above error. If I get the error, I always seem to get it at the beginning. Maybe it is related to starting new workers?

Was this page helpful?
0 / 5 - 0 ratings