Distributed: Resource Manager Interface

Created on 11 Feb 2016  Â·  17Comments  Â·  Source: dask/distributed

Resource Manager Interface

_The first step is hard, but standing up is harder._

Deploying distributed on a remote cluster is a large hurdle between the
curious user and serious adoption. Efforts to improve deployment can
significantly improve community uptake and value.

Unfortunately, there are a large number of deployment solutions so there is no
single piece of code we can write to satisfy most potential users. We have
direct relations with people who employ the following deployment technologies:

  1. YARN
  2. Mesos
  3. SLURM
  4. Torque
  5. SGE
  6. SSH
  7. Local processes

Supporting each of these is doable in isolation, but when we consider
supporting all in aggregate we desire a common framework.

Additionally, near-future versions of the distributed scheduler may want to
dynamically request and release worker resources depending on load. Providing
a single resource management interface to the scheduler would allow integration
with several technologies in a sane and maintainable manner.

Question: _What does such a resource management interface look like?_

Example Interface

Let us consider the following signals from scheduler to resource manager (RM)
and back.

  • Scheduler -> RM: Please give me more workers
  • Scheduler -> RM: I no longer need these workers
  • RM -> Scheduler: I plan to take back this worker very soon
  • RM -> Scheduler: This worker has unexpectedly died

We could implement this as a Python ResourceManger object that has fuctions for
"Please give me more workers" and "I no longer need these workers" that the
scheduler can call as well as callbacks that the scheduler provides on
ResourceManager creation for "I plan to take back this worker soon" and "this
worker has unexpectedly died".

class ResourceManager(object):
    def __init__(self, on_reclaiming_worker, on_worker_died, **kwargs):
        ...

    def request_workers(self, nworkers, **kwargs):
        ...

    def release_workers(self, worker_ids):
        ...

We might then subclass this interface for processes, SSH, Mesos, Yarn, etc..

Questions

Different resource management systems employ different information. Yarn
allows the specification of CPUs and Memory and the use of containers, HPC job
schedulers have fixed job durations, Mesos operates a bit differently from all.

The example operations provided above may not fit all resource managers. Some
may provide only a subset of this functionality while some may have a superset.
How do we balance the needs to specialize to a particular system while also
maintaining compatibility with many systems and keeping code simple.

cc @hussainsultan @broxtronix @quasiben @danielfrg

All 17 comments

So questions all, are the four signals sufficient? Are there things that we're missing? What inputs do they need to take? How do individual systems differ?

  • @quasiben what information can we supply to Yarn?
  • @hussainsultan @danielfrg does this kind of interface even make sense for Mesos? Is there some way we can make it fit?
  • @broxtronix is this of any use for custom deployments with ssh and custom scripts?
  • @koverholt any concerns with traditional job schedulers like SLURM?

Subscribing to the discussion...

What about Docker swarm?

Additionally we probably also want signals like

  • Anything -> RM: Give me logs for this worker
  • Anything -> RM: What is the status of this worker

+1 for docker swarm & kubernetes. Both can be supported with the same code via the Apache Libcloud Container API along with Amazon ECS as well.

Support for SLURM and SGE can be implemented at once by reusing the lightweigth abstraction provided by https://github.com/clusterlib/clusterlib.

I'm slowly experimenting trying to find a good API and implementations to launch and monitor jobs over local processes, SSH, and Yarn. I should clean up this work and share it. I would also be quite happy for anyone else to take this on.

Once this RM API is implemented, it could be leveraged to implement dynamic / elastic scheduling where heuristics based on the length of the scheduler queue and mean task completion time could be used to automatically provision new workers when there is a lot of pending work and automatically release workers when they are idle for some time.

@ogrisel, @quasiben here is a rough and trivial implementation for local processs spawning for high-level API review.

https://github.com/mrocklin/spawner/blob/master/spawner/subprocess.py
https://github.com/mrocklin/spawner/blob/master/spawner/tests/test_subprocess.py

I recommend starting with the LocalProcessSpawner docstring, followed by tests. Please ignore other files in this repository. They're from older experiments.

I hope none of the Jupyter folks mind me stealing the term spawner. Happy to change if there is conflict concern.

Alright, thanks.

So this spawner API would be called by the Resource Manager to launch new dworker process (that in turn can launch their own process children) is this right?

You would like one generic ResourceManager class that would delegate the spawning of the workers to platform specific Spawner implementations (for local process, ssh process, YARN, docker and SLURM/SGE) via composition?

BTW the shell=True stuff is unix specific. I think this part should be refactored to work on windows too.

Yes. The scheduler (or something closely attached to the scheduler) will contain logic to start and stop workers. I think that before this logic can exist we need a unified way of stopping/starting workers on various systems. If we can agree on an API that we can to implement over processes/ssh/sge/dockerspawner, then we can start using that API within the scheduler (or something closely attached) and start thinking about when we should start new jobs.

Alternatively, maybe this isn't the right place to generalize? Maybe the logic of when to start/stop new jobs also needs to be specific to the job-starting system in use. Thoughts?

Maybe the logic of when to start/stop new jobs also needs to be specific to the job-starting system in use.

I don't know yet. We have to try. Let's start by implementing the spawning API for a couple of backends and see how what are the constraints. I will start with SGE / SLURM.

May I suggest using drmaa for interacting with things such as SGE/UGE? My employer is a large HPC shop and everyone uses drmaa to submit compute jobs.

It works with:

  • Grid Engine (sun original one or univa commercial one)
  • HT Condor
  • Torque
  • Gridway
  • Unicore
  • Tivoli's Loadlevel scheduler
  • Slurm

For big HPC envs, drmaa (and its excellent python) are an industry standard.

@ogrisel ^^

Because this issue comes up a lot for me, it is worth noting that the state of play currently is that a distributed-drmaa interface exists at https://github.com/dask/dask-drmaa.

I would be curious to hear about your experience with dask-drmaa if any

On Mon, Apr 3, 2017 at 5:24 AM, Phil Elson notifications@github.com wrote:

Because this issue comes up a lot for me, it is worth noting that the
state of play currently is that a distributed-drmaa interface exists at
https://github.com/dask/dask-drmaa.

—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/128#issuecomment-291091686,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszPHymfL723Xdq7S2doMIvK5oVC7dks5rsLqwgaJpZM4HYd-V
.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

quasiben picture quasiben  Â·  7Comments

quasiben picture quasiben  Â·  5Comments

mrocklin picture mrocklin  Â·  6Comments

fjetter picture fjetter  Â·  6Comments

tom-andersson picture tom-andersson  Â·  3Comments