When creating a cluster object we currently scale by number of workers
cluster = KubeCluster()
cluster.scale(10)
Where 10 is the number of workers we want to have. However, it is common for users to think about clusters in terms of number of cores or amount of memory, rather than in terms of number of dask workers
cluster.scale(cores=100)
cluster.scale(memory='1 TB')
What is the best way to achieve this uniformly across the dask deployment projects? I currently see two approaches, though there are probably more that others might see.
Establish a convention where clusters define information about the workers they will produce, something like the following:
>>> cluster.worker_info
{'cores': 4, 'memory': '16 GB'}
Then the core Cluster.scale method would translate this into number of workers and then call the subclass's scale method appropriately
Let the downstream classes handle this themselves, but ask them all to handle it uniformly. This places more burden onto downstream implementations, but also gives them more freedom to select worker types as they see fit based on their capabilities.
cc
dask-jobqueuedask-yarndask-kubernetesI'm working on a first example for the first proposed solution.
Couple thoughts on this.
First there tend to be physical limits on clusters that will at some point make scaling this way impractical at least at a certain point. For instance asking for 100 cores is (unless I'm mistaken) not going to be able to allocate them all on the same CPU. Depending on the application this could be problematic. How do we handle cases where users intended to have cores on the same CPU? This is more likely to be a problem when it comes to memory, but the same question stands there.
Second some clusters have different options in terms of nodes that could be chosen. How does the job scheduler decide what nodes are sufficient to add up to the user's requirements? Related how does a user specify a constraint such as keeping using larger portions of single nodes (e.g. more CPU, more memory on each node that has workers).
First there tend to be physical limits on clusters that will at some point make scaling this way impractical
Second some clusters have different options in terms of nodes that could be chosen
I'm not sure to understand what you mean, are you talking about scaling with cores and memory kwargs, or scaling in general? The problem you describe seems more generic to me, and could occur event when scaling with worker number.
As this functionality would be common across the downstream projects it would make more sense to me to put this in the upstream project.
Nice proposal though!
As this functionality would be common across the downstream projects it would make more sense to me to put this in the upstream project.
If I'm not mistaken, we are in upstream project, aren't we?
@guillaumeeb yes but Matt asked if it should remain here or move downstream.
@jacobtomlinson sorry about my remark, I did not understand what you meant! Totally agree with you.
Just wondering if we need the concept of different pools of workers where each pool can have different worker specs.
For this to work well there would probably have to be a way to specify both run this job/task only on the specified pools as well as prefer to run this job/task on these pools but use other pools if the specified ones are busy.
Each pool type could then have its own min/max workers and the scheduler could scale up/down the workers associated with each pool depending on the number of jobs associated with each.
Just to mention that a first (simple) PR for this issues is available in #2209 in case people here missed it!
@dhirschfeld the concept of different pool seems interesting for scaling with specific worker profiles (GPU, big memory nodes...)
We could have something like this for dask-jobqueue
python
cluster.add_pool(processes=1, cores=1, memory='16GB', queue='qgpu', pool_name='GPU', walltime='02:00:00')
cluster.scale(10, pool='GPU')
cluster.scale(100) # default pool
Maybe this belongs more to #2118? But both issues are linked.
Most helpful comment
Just to mention that a first (simple) PR for this issues is available in #2209 in case people here missed it!
@dhirschfeld the concept of different pool seems interesting for scaling with specific worker profiles (GPU, big memory nodes...)
We could have something like this for dask-jobqueue
python cluster.add_pool(processes=1, cores=1, memory='16GB', queue='qgpu', pool_name='GPU', walltime='02:00:00') cluster.scale(10, pool='GPU') cluster.scale(100) # default poolMaybe this belongs more to #2118? But both issues are linked.