Motivated by a desire for reduced latencies on the workers for Actors (we found that 1ms things were taking 5ms) we added a thread that statistically profiles the event loop. This showed overhead from a couple surprising sources:
psutil and the SystemMonitorwrite_to_fd which apparently isn't entirely non-blocking, see this stack overflow questionadd_callback overhead, see this stack overflow questionI'm not sure how best to address these. There are probably a few approaches:
add_callback, and see if there aren't some occasions where we can reduce our use of TornadoI can reduce the overhead of using add_callback by patching tornado to use asyncio.call_soon rather than asyncio.call_soon_threadsafe (see https://github.com/tornadoweb/tornado/issues/2463) and then annotating some uses of add_callback from within the event loop as being safe.
Waiting on that tornado issue to learn if there is a better way forward.
diff --git a/distributed/worker.py b/distributed/worker.py
index 48e4bb8..4a73b7c 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -338,7 +338,7 @@ class WorkerBase(ServerNode):
self.batched_stream = BatchedSend(interval='2ms', loop=self.loop)
self.batched_stream.start(comm)
self.periodic_callbacks['heartbeat'].start()
- self.loop.add_callback(self.handle_scheduler, comm)
+ self.loop.add_callback(self.handle_scheduler, comm, threadsafe=True)
@gen.coroutine
def handle_scheduler(self, comm):
@@ -351,7 +351,8 @@ class WorkerBase(ServerNode):
finally:
if self.reconnect:
logger.info("Connection to scheduler broken. Reconnecting...")
- self.loop.add_callback(self._register_with_scheduler)
+ self.loop.add_callback(self._register_with_scheduler,
+ threadsafe=True)
else:
yield self._close(report=False)
@@ -1421,7 +1422,8 @@ class Worker(WorkerBase):
if not self.who_has.get(dep):
if dep not in self._missing_dep_flight:
self._missing_dep_flight.add(dep)
- self.loop.add_callback(self.handle_missing_dep, dep)
+ self.loop.add_callback(self.handle_missing_dep, dep,
+ threadsafe=True)
for key in self.dependents.get(dep, ()):
if self.task_state[key] == 'waiting':
if remove: # try a new worker immediately
@@ -1535,7 +1537,7 @@ class Worker(WorkerBase):
assert all(dep in self.data for dep in self.dependencies[key])
self.executing.add(key)
- self.loop.add_callback(self.execute, key)
+ self.loop.add_callback(self.execute, key, threadsafe=True)
except Exception as e:
logger.exception(e)
if LOG_PDB:
@@ -1667,7 +1669,7 @@ class Worker(WorkerBase):
for dep in missing_deps2:
self._missing_dep_flight.add(dep)
self.loop.add_callback(self.handle_missing_dep,
- *missing_deps2)
+ *missing_deps2, threadsafe=True)
deps = [dep for dep in deps if dep not in missing_deps]
@@ -1699,7 +1701,8 @@ class Worker(WorkerBase):
for d in to_gather:
self.transition_dep(d, 'flight', worker=worker)
self.loop.add_callback(self.gather_dep, worker, dep,
- to_gather, total_nbytes, cause=key)
+ to_gather, total_nbytes, cause=key,
+ threadsafe=True)
changed = True
I suspect that for the comm overhead a good first step would be to try to implement a comm for asyncio (see https://github.com/dask/distributed/issues/2162) and then use uvloop
OK, there is a workable uvloop compatible Comm implementation in #2165
The picture doesn't change a whole lot yet. Some conversation in https://stackoverflow.com/questions/51731690/why-does-asyncio-spend-time-in-socket-senddata
However, adding up all of these small things has reduced our worker overhead substantially. I need a new and harder case study :)
The tornado add_callback issue was resolved. Scheduler and Worker CPU overhead seems much lower these days as a result. This will be out in Tornado 6.
For psutil, it's possible to use https://psutil.readthedocs.io/en/latest/#psutil.Process.oneshot to collect cpu and memory in one round.
The requirement is psutil >= 5.0.
@emaror mentioned ~9% CPU for each worker in https://github.com/dask/distributed/issues/2079#issue-336628150 with python 2.7.
I had similar observations.
I just switched to python 3.6, and the workers are down to 2-4% each.
@NotSqrt if you're inclined to try using psutil.Process.oneshot to accelerate things here that would be a welcome contribution
Yes, I'm working on it, while also tracking memory and CPU of worker children, when tasks create subprocesses.
With Python 2.7 we also experienced each idle worker process using about 10% CPU. For us, changing the following in distributed/distributed.yaml brought the usage down to about 0.5% (the choice to try and edit these came from debugging the worker):
distributed.admin.tick.interval: 1000ms
distributed.worker.profile.interval: 1000ms
Most helpful comment
With Python 2.7 we also experienced each idle worker process using about 10% CPU. For us, changing the following in
distributed/distributed.yamlbrought the usage down to about 0.5% (the choice to try and edit these came from debugging the worker):