Distributed: Investigate worker overhead

Created on 2 Aug 2018  路  9Comments  路  Source: dask/distributed

Motivated by a desire for reduced latencies on the workers for Actors (we found that 1ms things were taking 5ms) we added a thread that statistically profiles the event loop. This showed overhead from a couple surprising sources:

  1. psutil and the SystemMonitor
  2. Tornado's write_to_fd which apparently isn't entirely non-blocking, see this stack overflow question
  3. Tornado's add_callback overhead, see this stack overflow question

I'm not sure how best to address these. There are probably a few approaches:

  1. Check that we're using psutil appropriately, and that there isn't some better way to regularly poll system use at high-ish frequency (currently we poll every 500ms)
  2. Quantify the cause of add_callback, and see if there aren't some occasions where we can reduce our use of Tornado
  3. Investigate other concurrency frameworks, like asyncio + uvloop. This sounds neat, but is likely expensive for many reasons. I did try using uvloop + asyncio + tornado but it wasn't very effective. The overhead appears to be higher in this stack so that uvloop doesn't seem to do much good.

Most helpful comment

With Python 2.7 we also experienced each idle worker process using about 10% CPU. For us, changing the following in distributed/distributed.yaml brought the usage down to about 0.5% (the choice to try and edit these came from debugging the worker):

distributed.admin.tick.interval: 1000ms 
distributed.worker.profile.interval: 1000ms

All 9 comments

I can reduce the overhead of using add_callback by patching tornado to use asyncio.call_soon rather than asyncio.call_soon_threadsafe (see https://github.com/tornadoweb/tornado/issues/2463) and then annotating some uses of add_callback from within the event loop as being safe.

Waiting on that tornado issue to learn if there is a better way forward.

diff --git a/distributed/worker.py b/distributed/worker.py
index 48e4bb8..4a73b7c 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -338,7 +338,7 @@ class WorkerBase(ServerNode):
         self.batched_stream = BatchedSend(interval='2ms', loop=self.loop)
         self.batched_stream.start(comm)
         self.periodic_callbacks['heartbeat'].start()
-        self.loop.add_callback(self.handle_scheduler, comm)
+        self.loop.add_callback(self.handle_scheduler, comm, threadsafe=True)

     @gen.coroutine
     def handle_scheduler(self, comm):
@@ -351,7 +351,8 @@ class WorkerBase(ServerNode):
         finally:
             if self.reconnect:
                 logger.info("Connection to scheduler broken.  Reconnecting...")
-                self.loop.add_callback(self._register_with_scheduler)
+                self.loop.add_callback(self._register_with_scheduler,
+                                       threadsafe=True)
             else:
                 yield self._close(report=False)

@@ -1421,7 +1422,8 @@ class Worker(WorkerBase):
             if not self.who_has.get(dep):
                 if dep not in self._missing_dep_flight:
                     self._missing_dep_flight.add(dep)
-                    self.loop.add_callback(self.handle_missing_dep, dep)
+                    self.loop.add_callback(self.handle_missing_dep, dep,
+                                           threadsafe=True)
             for key in self.dependents.get(dep, ()):
                 if self.task_state[key] == 'waiting':
                     if remove:  # try a new worker immediately
@@ -1535,7 +1537,7 @@ class Worker(WorkerBase):
                 assert all(dep in self.data for dep in self.dependencies[key])

             self.executing.add(key)
-            self.loop.add_callback(self.execute, key)
+            self.loop.add_callback(self.execute, key, threadsafe=True)
         except Exception as e:
             logger.exception(e)
             if LOG_PDB:
@@ -1667,7 +1669,7 @@ class Worker(WorkerBase):
                     for dep in missing_deps2:
                         self._missing_dep_flight.add(dep)
                     self.loop.add_callback(self.handle_missing_dep,
-                                           *missing_deps2)
+                                           *missing_deps2, threadsafe=True)

                     deps = [dep for dep in deps if dep not in missing_deps]

@@ -1699,7 +1701,8 @@ class Worker(WorkerBase):
                     for d in to_gather:
                         self.transition_dep(d, 'flight', worker=worker)
                     self.loop.add_callback(self.gather_dep, worker, dep,
-                                           to_gather, total_nbytes, cause=key)
+                                           to_gather, total_nbytes, cause=key,
+                                           threadsafe=True)
                     changed = True

I suspect that for the comm overhead a good first step would be to try to implement a comm for asyncio (see https://github.com/dask/distributed/issues/2162) and then use uvloop

OK, there is a workable uvloop compatible Comm implementation in #2165

The picture doesn't change a whole lot yet. Some conversation in https://stackoverflow.com/questions/51731690/why-does-asyncio-spend-time-in-socket-senddata

However, adding up all of these small things has reduced our worker overhead substantially. I need a new and harder case study :)

The tornado add_callback issue was resolved. Scheduler and Worker CPU overhead seems much lower these days as a result. This will be out in Tornado 6.

For psutil, it's possible to use https://psutil.readthedocs.io/en/latest/#psutil.Process.oneshot to collect cpu and memory in one round.
The requirement is psutil >= 5.0.

@emaror mentioned ~9% CPU for each worker in https://github.com/dask/distributed/issues/2079#issue-336628150 with python 2.7.

I had similar observations.

I just switched to python 3.6, and the workers are down to 2-4% each.

@NotSqrt if you're inclined to try using psutil.Process.oneshot to accelerate things here that would be a welcome contribution

Yes, I'm working on it, while also tracking memory and CPU of worker children, when tasks create subprocesses.

With Python 2.7 we also experienced each idle worker process using about 10% CPU. For us, changing the following in distributed/distributed.yaml brought the usage down to about 0.5% (the choice to try and edit these came from debugging the worker):

distributed.admin.tick.interval: 1000ms 
distributed.worker.profile.interval: 1000ms
Was this page helpful?
0 / 5 - 0 ratings