Goal: Share a few small queues between tasks under various Client configurations.
Issue: Using distributed 1.22.0,
from dask.distributed import Client, Queue
client = Client(processes=False)
q = Queue()
def put():
q.put(55)
def get():
print(q.get())
fut = client.submit(put)
res = client.submit(get)
results in
Traceback (most recent call last):
File ".../python3.6/site-packages/distributed/queues.py", line 261, in __setstate__
assert client.address == address
AttributeError: 'Client' object has no attribute 'address'
...
File ".../python3.6/site-packages/distributed/comm/core.py", line 178, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://10.0.0.41/16785/7' after 10 s: connect() didn't finish in time
This works as intended when processes=True. I also can't use queue.Queue because it won't be pickleable, so I was hoping dask.distributed.Queue would allow me to work around that.
Replicated. Thanks for the minimal bug report Chris. I'll take a look.
I'm also curious, what is your intended application here?
On Wed, Aug 29, 2018 at 1:30 PM, Chris White notifications@github.com
wrote:
Goal: Share a few small queues between tasks under various Client
configurations.Issue: Using distributed 1.22.0,
from dask.distributed import Client, Queue
client = Client(processes=False)
q = Queue()
def put():
q.put(55)
def get():
print(q.get())fut = client.submit(put)
res = client.submit(get)results in
Traceback (most recent call last):
File ".../python3.6/site-packages/distributed/queues.py", line 261, in __setstate__
assert client.address == addressAttributeError: 'Client' object has no attribute 'address'
...File ".../python3.6/site-packages/distributed/comm/core.py", line 178, in _raise
raise IOError(msg)OSError: Timed out trying to connect to 'inproc://10.0.0.41/16785/7' after 10 s: connect() didn't finish in timeThis works as intended when processes=True. I also can't use queue.Queue
because it won't be pickleable, so I was hoping dask.distributed.Queue
would allow me to work around that.—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/2220, or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszDUDbynET6_FSB54riE7L8JiWy2Rks5uVs_AgaJpZM4WSAGM
.
Thanks!
Ultimately to ensure control over how many jobs (of differing types) are running simultaneously, ideally without lots of custom logic for how the jobs are executed (synchronously, multiprocessing, distributed, etc.).
Queues seemed like the simplest non-custom interface for this (I can rely on q.get(timeout=x) blocking, set a maxsize, etc.).
Possibly resolved in https://github.com/dask/distributed/pull/2221
This is also the sort of thing for which it would be nice to have an explicit Semaphore object. There is https://github.com/dask/distributed/issues/2007 if you ever feel like diving into the guts of the distributed scheduler. It's a decent entry issue.
Awesome, thanks! I will check out that issue and see where it leads me.
Re: #2221 it definitely fixes the problem when processes=False, however, I think it might introduce a new issue when processes=True and cleanup occurs:
from dask.distributed import Client, Queue
client = Client(processes=True)
q = Queue()
def put():
q.put(55)
def get():
return q.get()
fut = client.submit(put)
res = client.submit(get)
x = res.result() # correctly returns 55
q.close()
client.close() # <--- the offending line
results in
Exception ignored in: <generator object add_client at 0x1192fcd00>
RuntimeError: generator ignored GeneratorExit
Future exception was never retrieved
future: <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed',)>
Traceback (most recent call last):
File ".../distributed/distributed/comm/tcp.py", line 177, in read
n_frames = yield stream.read_bytes(8)
File ".../python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
tornado.iostream.StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File ".../python3.6/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File ".../distributed/distributed/comm/tcp.py", line 198, in read
convert_stream_closed_error(self, e)
File ".../distributed/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
I verified that with the same environment, this error does not occur when working off 1.22.0 and off master.
I suspect that this was an issue previously, but happens intermittently and so you didn't always notice it. It's annoying, but inncouous. https://github.com/dask/distributed/issues/1969
Ah interesting - thanks!
I have a test suite which creates / closes lots of small dask.distributed.Clients and I _think_ when distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed is raised, some processes aren't cleaned up correctly, resulting in the test suite eventually shutting down with OSError: [Errno 24] Too many open files. I'll look into it more and comment on that issue if I find anything.
Are you using our test harness?
https://distributed.readthedocs.io/en/latest/develop.html#writing-tests
On Wed, Aug 29, 2018 at 3:41 PM, Chris White notifications@github.com
wrote:
Ah interesting - thanks!
I have a test suite which creates / closes lots of small
dask.distributed.Clients and I think when distributed.comm.core.CommClosedError:
in: Stream is closed is raised, some processes aren't
cleaned up correctly, resulting in the test suite eventually shutting down
with OSError: [Errno 24] Too many open files. I'll look into it more and
comment on that issue if I find anything.—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/2220#issuecomment-417079748,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszLLOEIB_sV7mWTc9xMMGOnTc9nDnks5uVu5cgaJpZM4WSAGM
.
Hmmm no I'm not, it didn't seem to support easily switching between processes=True vs. processes=False. I actually created some fixtures and was able to resolve my situation now that I know distributed.comm.core.CommClosedError was a red herring - thank you!
Most helpful comment
Possibly resolved in https://github.com/dask/distributed/pull/2221
This is also the sort of thing for which it would be nice to have an explicit Semaphore object. There is https://github.com/dask/distributed/issues/2007 if you ever feel like diving into the guts of the distributed scheduler. It's a decent entry issue.