Distributed: Can't use Queues w/ `processes=False`

Created on 29 Aug 2018  Â·  8Comments  Â·  Source: dask/distributed

Goal: Share a few small queues between tasks under various Client configurations.

Issue: Using distributed 1.22.0,

from dask.distributed import Client, Queue

client = Client(processes=False)
q = Queue()

def put():
    q.put(55)

def get():
    print(q.get())

fut = client.submit(put)
res = client.submit(get)

results in

Traceback (most recent call last):
  File ".../python3.6/site-packages/distributed/queues.py", line 261, in __setstate__
    assert client.address == address
AttributeError: 'Client' object has no attribute 'address'

...

File ".../python3.6/site-packages/distributed/comm/core.py", line 178, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://10.0.0.41/16785/7' after 10 s: connect() didn't finish in time

This works as intended when processes=True. I also can't use queue.Queue because it won't be pickleable, so I was hoping dask.distributed.Queue would allow me to work around that.

Most helpful comment

Possibly resolved in https://github.com/dask/distributed/pull/2221

This is also the sort of thing for which it would be nice to have an explicit Semaphore object. There is https://github.com/dask/distributed/issues/2007 if you ever feel like diving into the guts of the distributed scheduler. It's a decent entry issue.

All 8 comments

Replicated. Thanks for the minimal bug report Chris. I'll take a look.

I'm also curious, what is your intended application here?

On Wed, Aug 29, 2018 at 1:30 PM, Chris White notifications@github.com
wrote:

Goal: Share a few small queues between tasks under various Client
configurations.

Issue: Using distributed 1.22.0,

from dask.distributed import Client, Queue

client = Client(processes=False)
q = Queue()
def put():
q.put(55)
def get():
print(q.get())

fut = client.submit(put)
res = client.submit(get)

results in

Traceback (most recent call last):
File ".../python3.6/site-packages/distributed/queues.py", line 261, in __setstate__
assert client.address == addressAttributeError: 'Client' object has no attribute 'address'
...

File ".../python3.6/site-packages/distributed/comm/core.py", line 178, in _raise
raise IOError(msg)OSError: Timed out trying to connect to 'inproc://10.0.0.41/16785/7' after 10 s: connect() didn't finish in time

This works as intended when processes=True. I also can't use queue.Queue
because it won't be pickleable, so I was hoping dask.distributed.Queue
would allow me to work around that.

—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/2220, or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszDUDbynET6_FSB54riE7L8JiWy2Rks5uVs_AgaJpZM4WSAGM
.

Thanks!

Ultimately to ensure control over how many jobs (of differing types) are running simultaneously, ideally without lots of custom logic for how the jobs are executed (synchronously, multiprocessing, distributed, etc.).

Queues seemed like the simplest non-custom interface for this (I can rely on q.get(timeout=x) blocking, set a maxsize, etc.).

Possibly resolved in https://github.com/dask/distributed/pull/2221

This is also the sort of thing for which it would be nice to have an explicit Semaphore object. There is https://github.com/dask/distributed/issues/2007 if you ever feel like diving into the guts of the distributed scheduler. It's a decent entry issue.

Awesome, thanks! I will check out that issue and see where it leads me.

Re: #2221 it definitely fixes the problem when processes=False, however, I think it might introduce a new issue when processes=True and cleanup occurs:

from dask.distributed import Client, Queue

client = Client(processes=True)
q = Queue()

def put():
    q.put(55)

def get():
    return q.get()

fut = client.submit(put)
res = client.submit(get)
x = res.result() # correctly returns 55

q.close()
client.close() # <--- the offending line

results in

Exception ignored in: <generator object add_client at 0x1192fcd00>
RuntimeError: generator ignored GeneratorExit
Future exception was never retrieved
future: <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed',)>
Traceback (most recent call last):
  File ".../distributed/distributed/comm/tcp.py", line 177, in read
    n_frames = yield stream.read_bytes(8)
  File ".../python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".../python3.6/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File ".../distributed/distributed/comm/tcp.py", line 198, in read
    convert_stream_closed_error(self, e)
  File ".../distributed/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

I verified that with the same environment, this error does not occur when working off 1.22.0 and off master.

I suspect that this was an issue previously, but happens intermittently and so you didn't always notice it. It's annoying, but inncouous. https://github.com/dask/distributed/issues/1969

Ah interesting - thanks!

I have a test suite which creates / closes lots of small dask.distributed.Clients and I _think_ when distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed is raised, some processes aren't cleaned up correctly, resulting in the test suite eventually shutting down with OSError: [Errno 24] Too many open files. I'll look into it more and comment on that issue if I find anything.

Are you using our test harness?
https://distributed.readthedocs.io/en/latest/develop.html#writing-tests

On Wed, Aug 29, 2018 at 3:41 PM, Chris White notifications@github.com
wrote:

Ah interesting - thanks!

I have a test suite which creates / closes lots of small
dask.distributed.Clients and I think when distributed.comm.core.CommClosedError:
in : Stream is closed is raised, some processes aren't
cleaned up correctly, resulting in the test suite eventually shutting down
with OSError: [Errno 24] Too many open files. I'll look into it more and
comment on that issue if I find anything.

—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/2220#issuecomment-417079748,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszLLOEIB_sV7mWTc9xMMGOnTc9nDnks5uVu5cgaJpZM4WSAGM
.

Hmmm no I'm not, it didn't seem to support easily switching between processes=True vs. processes=False. I actually created some fixtures and was able to resolve my situation now that I know distributed.comm.core.CommClosedError was a red herring - thank you!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

jameslamb picture jameslamb  Â·  6Comments

DPeterK picture DPeterK  Â·  3Comments

tom-andersson picture tom-andersson  Â·  3Comments

fjetter picture fjetter  Â·  6Comments

mrocklin picture mrocklin  Â·  3Comments