Drake: Remove all non-clustermq parallel backends?

Created on 27 Oct 2018  路  29Comments  路  Source: ropensci/drake

Background

Currently, drake offers a multitude of parallel computing backends.

parallelism_choices()
##  [1] "clustermq"            "clustermq_staged"     "future"              
##  [4] "future_lapply"        "future_lapply_staged" "hasty"               
##  [7] "Makefile"             "mclapply"             "mclapply_staged"     
## [10] "parLapply"            "parLapply_staged"

Over the last two years, the number of backends grew and grew because I had so much to learn about high-performance computing. And I still do. But now, users have unnecessarily complicated choices to make, and it is difficult to develop and maintain so many backends.

The more I learn, the more "clustermq" seems like the best of all worlds for drake.

  1. The clustermq package can deploy locally with the "multicore" option and remotely to most common schedulers.
  2. Overhead is very low, even comparable to drake's non-staged multicore backends. Thanks to clustermq, initialization, interprocess communication, and load balancing appear very fast.
  3. We may not need future after all. Yes, the future ecosystem is amazingly powerful and flexible, and https://github.com/HenrikBengtsson/future/issues/204 could potentially even provide staged clustermq-based parallelism. However, I do wonder about a couple things.

    1. How much value do future's non-clustermq backends still provide here? Is there still a need for batchtools-based HPC?

    2. For directed acyclic graphs (DAGs) of non-embarrassingly-parallel jobs, it is important to have full access to a pool of automatically load-balanced persistent clustermq workers so drake can submit new jobs as soon as their dependencies finish and a worker becomes available. (Relevant: https://github.com/mschubert/clustermq/issues/86#issuecomment-401358553 and https://github.com/mschubert/clustermq/issues/86#issuecomment-401461495.) Does future allow this?

Proposal

For drake version 7.0.0 – which I hope to release in the first half of 2019 – let's think about removing all parallelism choices except "clustermq". (And let's keep "hasty" mode too, which is just an oversimplified a clone of "clustermq". It's not much code, and it's a great sandbox for benchmarking).

Benefits

Removing superfluous parallel backends will simplify the user experience. Users will no longer be overwhelmed by all the parallel computing choices and having to figure out which one is right for them. In addition, the code base and test suite will be considerably smaller, simpler, leaner, cleaner, faster, easier to maintain, more reliable, and more attractive to potential collaborators.

Your help

My goals for late 2018 are to

  1. Assess the feasibility of this change.
  2. If the change is a good idea, ensure the prerequisites for development are in place.

I would sincerely appreciate any input, advice, help, and participation you are willing to lend.

How will this affect you as a user?

Do you rely on other backends? Having problems with make(parllelism = "clustermq")? Let's talk. I will personally help you transition.

What am I missing?

Are there use cases that are inherently more difficult for clustermq than the other backends? The existing backends have different strengths and weaknesses, and I want to leave time for discussion before assuming clustermq is a one-size-fits-all solution.

Related issues

From a development perspective, the chief obstacles seem to be

Here is a more exhaustive list of relevant issues.

cc

These are some of the folks involved in earlier discussions about drake's high-performance computing. Please feel free to add more. I probably forgot many.

  • @bart1
  • @bmchorse
  • @dapperjapper
  • @gmbecker
  • @guilhermealles
  • @mschubert
  • @HenrikBengtsson
  • @idavydov
  • @kendonB
  • @krlmlr
  • @nulloa
help or input advanced priority dependencies performance

Most helpful comment

With the risk of being "fluffy", here are some quick comments and clarifications related to the future framework and what I think is the gist of this thread:

  • The idea of the Future Core API is to provide a unified framework:

    • that provides a minimal ("atomic") set of build blocks - no more no less - for evaluating R code "anywhere" and asynchronously
    • where the Future class provides a "container" holding an R expression and its dependencies
    • where Future objects can be passed on to R processes running "anywhere"
    • when I look at our existing parallel/distributed backends, at their their very core, they are all implementing their own version of a future()/resolved()/value() API; conceptually they could implement the same lightweight API at this level. Not claiming it will ever happen, but the design goal of the future framework is such that packages such as 'parallel', 'foreach', 'batchtools', 'clustermq' etc could implement their own future backends and then their higher-level functions will build on top of these common core parallelization building blocks
    • Higher-level use cases from tools like drake greatly helps to identify/narrow in on what the atomic set of building blocks should be
  • I'm hoping to get to a future.clustermq backend "soon-ish";

    • I wanted to get a working draft of future.tests first to help the validation (it's in a decent shape now)
    • Note that a future.clustermq backend will actually "peel off" the workload balancing that clustermq has built in, which means a vanilla future.clustermq backend might not do what you need. Why? This is because the Future Core API (as defined/implemented in the 'future' package) does not have a concept of workload balancing (but see next).
  • Specifying and requesting computational resources needed to evaluate a particular computational container ("future") is not obvious:

    • We need some type of standard and I don't think it exists, i.e. it needs to be identified and developed carefully
    • HPC schedulers provide some framework and terminology for this
    • Different schedulers does not fully agree on how to specify this, e.g. for some you specify memory needed for the whole job whereas for others per slot in a job. Some of these can be encapsulated in a unifying API, but some may be unique to certain schedulers and needs
  • It's on my long-term roadmap to make it possible to merge multiple Future objects:

    • the hope is to identify (and eventually provide) fundamental buildings block for doing workload balancing ("chunking")
    • for example, imagine a for loop over 100 iterations creating 100 separate futures. If we could merge these into 10 new futures (sharing common globals and dependencies etc), we have effectively achieved the most basic type of workload balancing ("chunking") as we see in mclapply(), parLapply(), future_lapply(), etc.
    • in order to do this, I need to decouple the Future class further from the future backends (they're currently a _bit_ intertwined)
    • the goal is to provide the _minimal_ set of building blocks

All 29 comments

Rather than using the current clustermq backend, does it make sense to come
up with a solution for efficient distribution/caching of data shared
between multiple calls and move drake in that direction? (I actually
thought about contacting you for a possible R Consortium application, but
had too much other work to fit it in this round)

Happy to discuss this in more detail when I'm back from vacation/traveling
in 3 weeks.

I would love to discuss the details when you are back from vacation. I will actually be on a vacation myself from November 15 through 26, so maybe late November or early December would be a good time to follow up.

My own preference at the moment is to pursue both options simultaneously. I know, this is the sort of thinking that that gave drake too many parallel backends, but I see value in both, and I am not sure they are mutually exclusive. What you describe seems like a higher-risk, higher-reward effort that could replace clustermq in drake further down the road. In the nearer future, it is trivially easy on the implementation side to get rid of superfluous non-clustermq backends, and I believe this would solve immediate problems in development, testing, collaboration, and general ease of use.

Also, #384, #498, and this article could be relevant to your idea to the extent that storage is related to caching.

I started using drake with batchtools mainly because zeromq is an extra dependency. It turned out that we have it installed on the cluster. So eventually I switched to clustermq, and it seems to work great.

In my opinion, the only problem with having clustermq only is the fact that some clusters might not have zeromq installed. On top of that, even if you manage to compile zeromq from sources, if you have some organization-wide installation of Rstudio, it might be difficult to have all the environment variables setup correctly to use a locally-installed version.

Another consideration is the following: persistent workers are great but for large clusters, it is often considered a good practice to have shorter jobs rather than longer ones. So in case of large drake plans and clusters with a high load, it is probably good to keep an option of having transient workers. Maybe, even exploiting cluster dependency management.

Perhaps, the simplest way to do this is through Makefiles?

A quick note from my side, I just gave it a try. I needed to install zeromq from source which was surprisingly easy due to having no dependency issues. Afterwards I got the example running without any trouble (on slurm).

not knowing much about how clustermq works I do not know how it deals with longer jobs. Since jobs on the cluster have a limited run time, and it seems clustermq reuses existing instances sometimes I would probably want to limit this behaviour to start new jobs. To avoid jobs being killed by the cluster. I guess the reuse argument to clustermq::workers is meant for that.

An other question I have is if there is an option to run multiple jobs in one or more larger jobs cluster jobs. Previous I have used one 64 core tasks to run many different tasks.

I guess both these concerns can be addressed by updating the template file to call workers with reuse=FALSE and make multiple parallel calls to workers using something like mclapply(1:64, function(x){clustermq("{master}")}) in the slurm template (untested)

A quick note from my side, I just gave it a try. I needed to install zeromq from source which was surprisingly easy due to having no dependency issues.

I can second this. I've verified that installing ZeroMQ as a _non-privileged_ user from source on a _RHEL 6.6_(!) cluster went smoothly following the (traditional) installation instruction;

curl -L -O https://github.com/zeromq/libzmq/releases/download/v4.2.5/zeromq-4.2.5.tar.gz
tar xvf zeromq-4.2.5.tar.gz
./configure --prefix=/path/to/zeromq-4.2.5
make
make install

After this, 'rzmq' and 'clustermq' installed like a charm on a fresh R 3.5.1 setup.

Lots of HPC cluster run these older versions of RHEL/CentOS. I think there are folks out there stuck with RHEL 5 as well. Being able to support those is important. ("You need to update your OS", is not useful feedback for users on such environment.)

Thanks for chiming in. From your perspective, it sounds like the main issues are (1) persistent workers and (2) compatibility edge cases with ZeroMQ. https://github.com/mschubert/clustermq/issues/101 is solvable, but to Henrik's point, OS portability is key. All this makes me think we might keep the "future" backend as well as "clustermq" and "hasty". Those three backends are fewer than the existing 11, and together they cover a lot.

@idavydov

Perhaps, the simplest way to do this is through Makefiles?

make(parallelism = "Makefile") was actually drake's first parallel backend. In fact, my original intention was to offload to GNU Make as much as possible. Now, however, I think drake has grown independent of Make and we no longer need the "Makefile" backend.

I haven't been using drake much recently but as I said in #126, I used Makefile parallelism because I really liked the ability to kill jobs (from htop) without affecting my main R session. This worked with my workflow, where I rapidly iterate on my scripts. I'm not sure if this is a reasonable request, but I'm wondering if clustermq has a means for easily viewing the status of various jobs and killing them.

With clustermq and its persistent workers, you unfortunately would not be able to kill individual jobs. But with the "future" backend (which is back on the table) you might, especially if there is some way to expose and broadcast suggestive job names in batchtools template files.

Related: https://github.com/HenrikBengtsson/future/issues/93

I love the idea of fewer choices and a "universally good" solution, haven't had the chance to try _clustermq_ yet.

@dapperjapper, in 1ae4528ce9d006d258b8eaa547f13b690cfce5f1, I supplied the target name to the label argument of future(). So for those of us who use computing clusters and future.batchtools, as long as we use job.name in our batchtools template files, we see the names of our targets in job-monitoring utilities like qstat. As for local multisession/multicore parallelism, I do not know if it is possible to post informative job names that get picked up by htop.

@dapperjapper, using https://github.com/HenrikBengtsson/future.callr/commit/a8774795f33204fdf44e957843890ac283a62571 and https://github.com/ropensci/drake/commit/1ae4528ce9d006d258b8eaa547f13b690cfce5f1, you can now see the targets corresponding to processes in htop. Example: https://github.com/HenrikBengtsson/future/issues/96#issuecomment-437708937. We are one step closer to the safe removal of Makefile parallelism.

Fantastic work! Thank you!

Okay, unless someone convinces me otherwise in the meantime, I will plan to downsize to just "clustermq", "future", and "hasty" for version 7.0.0. I will likely start work on this after a couple conferences in January (one of which is RStudio::confg(2019)).

I have now tried the latest versions of both future and clustermq for some large embarassingly parallel jobs (a large spatial intersection operation of two sets of around 500000 polygons).

future with batchtools or multicore takes far too long to get going to be useful for large sets of quick jobs. future seems to be a fine option if the user puts in the work to match the number of futures to the number of workers (i.e. each call to future contains a target that runs a loop). A great feature is the ability to match resources to targets.

clustermq seems to be working great, except that there is still no way to match resources to targets. One could imagine having teams of different types of workers which would appear once they're needed and disappear when they're no longer needed.

I think that there are masses of wasted resources on clusters today, when people are overallocating resources to perform operations that don't need them. Having no option (other than manually staging) to allocate resources to targets will only result in more wastage on clusters.

Thanks for giving these backends a try.

future with batchtools or multicore takes far too long to get going to be useful for large sets of quick jobs.

Fortunately, clustermq has a multicore option too: options(clustermq.scheduler = "mutlicore").

Job groups, https://github.com/ropensci/drake/issues/561#issuecomment-433642782, and/or #304 are possible long-term solutions. drake has always struggled with large numbers of small targets, and I am hoping to solve this problem at some point. First, though, drake needs a smaller/cleaner code base, and that means downsizing the number of parallel backends.

clustermq seems to be working great, except that there is still no way to match resources to targets. One could imagine having teams of different types of workers which would appear once they're needed and disappear when they're no longer needed.

Or one team of persistent heterogeneous workers and appropriate load balancing to accommodate. I think this may be a good clustermq issue. https://github.com/mschubert/clustermq/issues/81 might also be related.

future with batchtools or multicore takes far too long to get going to be useful for large sets of quick jobs.

Admitting I've been out of the loop for a while, but didn't drake have a "future.apply" backend for the purpose of doing achieving balancing (on top of the core Future API provided by future)?

Yes. It's the "future_lapply_staged" option, which has worked pretty well for embarrassing parallel jobs. I may still lobby Will to support it as an option.

Or one team of persistent heterogeneous workers and appropriate load balancing to accommodate. I think this may be a good clustermq issue.

This would need the whole infrastructure of assigning each call a memory and time cost, constructing dependency graphs, and then distributing tasks according to maximize resource usage.

This, to me, sounds rather like a job for a workflow package than a scheduler interface package, so I'm unlikely to support this directly.

But I'm happy to adapt the worker API for e.g. drake to use it that way, or come up with a good glue like https://github.com/ropensci/drake/issues/561#issuecomment-433642782 suggests (I haven't forgotten about this).

Admitting I've been out of the loop for a while, but didn't drake have a "future.apply" backend for the purpose of doing achieving balancing (on top of the core Future API provided by future)?

I also have to read up on how the different future approaches work in detail, and consider putting https://github.com/HenrikBengtsson/future/issues/204 a bit higher up on the priority list (or is anyone interested in taking a stab at this?).

It may be time for drake to allow custom/external parallel backends. I am thinking about make(parallelism = your_scheduler_function), where your_scheduler_function() takes a drake_config() object and checks/builds all the targets. We could take the current internal run_future_lapply_staged() and put it in it's own package, similarly to how packages future.batchtools and future.callr extend future. We could also make extension easier by standardizing and exposing drake functions for building, checking, and storing targets. With all the changes planned for 7.0.0, I think we are coming up on the right time. Implementation may not start immediately, though. My January will be packed with other stuff, including 2 conferences.

https://github.com/ropensci/drake/issues/561#issuecomment-450052249 could also pave the way for #575.

Re https://github.com/ropensci/drake/issues/561#issuecomment-449975270, point well taken. drake needs to keep track of which targets need which resources, and of course, the timing of how the targets are deployed. However, I think drake may need help from clustermq in order to spawn a pool of heterogenous workers, and for each target, exclude workers that do not meet the resource requirements.

At the interface level, it would be great to combine https://github.com/HenrikBengtsson/future/issues/204 and https://ropenscilabs.github.io/drake-manual/hpc.html#the-resources-column-for-transient-workers. make(parallelism = "future") already recruits the optional resources argument of future.batchtools evaluators. And depending on efficiency, https://github.com/HenrikBengtsson/future/issues/204 might allow drake to fold make(parallelism = "clustermq") right into make(parallelism = "future"). The simplicity would be really nice.

Given the way things are going, I think the present issue will be solved with the following. Checked items are implemented in the 561 branch.

  • [x] Accept a user-defined function supplied to the parallelism argument of make(), along with a warning that the supplied backend is not officially supported.
  • [x] Externalize the unofficial backends to their own packages on GitHub (not for CRAN).

    • [x] hasty mode

    • [x] "future_lapply_staged" parallelism

  • [x] Use the code from "mclapply_staged" parallelism to process the imports.
  • [x] Remove the following backends from core drake:

    • [x] "clustermq_staged"

    • [x] "future_lapply"

    • [x] "Makefile"

    • [x] "mclapply"

    • [x] "mclapply_staged"

    • [x] "parLapply"

    • [x] "parLapply_staged"

  • [x] Deprecate the parallelism_choices() function.
  • [x] Update all the tests/checks.

After another look at the code base, I no longer think it is a good idea to officially support custom backends in external packages because it would require exposing too many sensitive internals. That said, I will still open a back door for experimentation: make(parallelism = your_scheduler_function). Caveats:

  1. This is really a sandbox similar to hasty mode, so drake will always throw a warning.
  2. To get at the necessary internals, you will need to use :::.

I think this approach could

  1. Make it easier for others to help with #575.
  2. Help advanced HPC users aggressively optimize scheduling for their computing resources.

Let's externally offload the unofficial backends through this backdoor: hasty mode and "future_lapply_staged" parallelism.

I believe all the immediate issues are fixed. @kendonB, "future_lapply_staged" parallelism is now available through https://github.com/wlandau/drake.future.lapply.staged. Likewise, hasty mode is offloaded to https://github.com/wlandau/drake.hasty. @mschubert, let's follow up separately about https://github.com/ropensci/drake/issues/561#issuecomment-433642782 and https://github.com/HenrikBengtsson/future/issues/204.

With the risk of being "fluffy", here are some quick comments and clarifications related to the future framework and what I think is the gist of this thread:

  • The idea of the Future Core API is to provide a unified framework:

    • that provides a minimal ("atomic") set of build blocks - no more no less - for evaluating R code "anywhere" and asynchronously
    • where the Future class provides a "container" holding an R expression and its dependencies
    • where Future objects can be passed on to R processes running "anywhere"
    • when I look at our existing parallel/distributed backends, at their their very core, they are all implementing their own version of a future()/resolved()/value() API; conceptually they could implement the same lightweight API at this level. Not claiming it will ever happen, but the design goal of the future framework is such that packages such as 'parallel', 'foreach', 'batchtools', 'clustermq' etc could implement their own future backends and then their higher-level functions will build on top of these common core parallelization building blocks
    • Higher-level use cases from tools like drake greatly helps to identify/narrow in on what the atomic set of building blocks should be
  • I'm hoping to get to a future.clustermq backend "soon-ish";

    • I wanted to get a working draft of future.tests first to help the validation (it's in a decent shape now)
    • Note that a future.clustermq backend will actually "peel off" the workload balancing that clustermq has built in, which means a vanilla future.clustermq backend might not do what you need. Why? This is because the Future Core API (as defined/implemented in the 'future' package) does not have a concept of workload balancing (but see next).
  • Specifying and requesting computational resources needed to evaluate a particular computational container ("future") is not obvious:

    • We need some type of standard and I don't think it exists, i.e. it needs to be identified and developed carefully
    • HPC schedulers provide some framework and terminology for this
    • Different schedulers does not fully agree on how to specify this, e.g. for some you specify memory needed for the whole job whereas for others per slot in a job. Some of these can be encapsulated in a unifying API, but some may be unique to certain schedulers and needs
  • It's on my long-term roadmap to make it possible to merge multiple Future objects:

    • the hope is to identify (and eventually provide) fundamental buildings block for doing workload balancing ("chunking")
    • for example, imagine a for loop over 100 iterations creating 100 separate futures. If we could merge these into 10 new futures (sharing common globals and dependencies etc), we have effectively achieved the most basic type of workload balancing ("chunking") as we see in mclapply(), parLapply(), future_lapply(), etc.
    • in order to do this, I need to decouple the Future class further from the future backends (they're currently a _bit_ intertwined)
    • the goal is to provide the _minimal_ set of building blocks

A quick note from my side, I just gave it a try. I needed to install zeromq from source which was surprisingly easy due to having no dependency issues.

I can second this. I've verified that installing ZeroMQ as a _non-privileged_ user from source on a _RHEL 6.6_(!) cluster went smoothly following the (traditional) installation instruction;

curl -L -O https://github.com/zeromq/libzmq/releases/download/v4.2.5/zeromq-4.2.5.tar.gz
tar xvf zeromq-4.2.5.tar.gz
./configure --prefix=/path/to/zeromq-4.2.5
make
make install

After this, 'rzmq' and 'clustermq' installed like a charm on a fresh R 3.5.1 setup.

Lots of HPC cluster run these older versions of RHEL/CentOS. I think there are folks out there stuck with RHEL 5 as well. Being able to support those is important. ("You need to update your OS", is not useful feedback for users on such environment.)

@HenrikBengtsson I managed to install zeromq on my university cluster (running CentOS 7.6.1810) using the commands you listed, but I'm having trouble getting rzmq to load properly; can you spell out in more detail where the zeromq libraries should be installed and/or what environmental variables need to be set for this to work?

Was this page helpful?
0 / 5 - 0 ratings

Related issues

krlmlr picture krlmlr  路  32Comments

wlandau picture wlandau  路  61Comments

wlandau-lilly picture wlandau-lilly  路  29Comments

wlandau picture wlandau  路  27Comments

kendonB picture kendonB  路  27Comments