Drake: Additional modes of parallelism

Created on 7 Jun 2017  Â·  41Comments  Â·  Source: ropensci/drake

advanced priority performance new feature

Most helpful comment

Now implemented! @HenrikBengtsson, thank you so much for your generous help!

I document the new "future_lapply" backend in the parallelism vignette. @jarad, @emittman, @nachalca, and @raymondkww, you may be interested in a new way to run things on SLURM. We may want to update @nachalca's SLURM array example if this works.

library(future)
library(future.batchtools)
library(drake)

backend(batchtools_slurm) # same as future::plan()

# Alternatively, plan to send targets to SLURM and then give each target multiple cores.
# This should work if you use `future_lapply()` somewhere within a target's command.
# backend(list(batchtools_slurm, multicore))

# Cap the max jobs with options(mc.cores = 2) or something similar from ?future.options
load_basic_example()
make(my_plan, parallelism = "future_lapply")

All 41 comments

Also Hadoop and sparklyr.

From @mrchypark, see airflow and luigi.

others in this repo

For Spark, it seems like sparklyr::spark_apply() would be ideal. We could wrap it into a worker for parallel_stage() like worker_parLapply() or worker_mclapply().

I just talked to a colleague, and Spark may not be the best option here. sparklyr::spark_apply() somehow might maybe possibly come into play for #77, but it does not need to. Spark is apparently not designed for general parallelism tasks, it is more about parallelism when you're splitting up a large data file. Drake is far more general.

On the other hand, Docker may be like Makefile parallelism, but more reproducible and robust. It may even help solve #76 with deployment hooks. @mrchypark, what do you think?

@wlandau-lilly good point. I agree docker is good choice. it's because you can monitor and kill processes. How can I help you?

It would be great to strategize with you because there are a multiple ways to approach this. Today, a colleague suggested that we could use a dockerfile like the current Makefile, constructing it based on the DAG and using it to spawn a separate R session for each target. Alternatively, we might define a docker-specific parallel worker like I did for the parallel::mclapply() and parallel::parLapply() backends.

I am probably missing other options and caveats here because I have never actually used docker before. Do you know of any tutorials specific to this situation?

batchtools has docker parallel mode and dockerflow is control docker with dataflow in google. I'll keep in touch any others find out.

I would suggest making docker an option, but not the default one. An issue that I've run into is that it requires admin rights to run (not just install) on Windows. So in a heavily controlled IT environment, docker is a non-starter for some users 😢 Make comes with Rtools however, and is more likely to be installed.

@AlexAxthelm I totally agree. Even if everyone had docker, I would probably still keep the current default: parLapply for Windows and mclapply for all other platforms. These are the most R-focused options, and drake is trying to be the most R-focused pipeline toolkit.

May I suggest using futures of the future package (I'm the author) for parallelism / distributed processing? it might provide a generic solution to what your shooting at here.

The idea is that you write code once and then the user can choose whatever backend they'd like, e.g. any backend parallel provides (built-in in future), or HPC-scheduler backends via batchtools (in future.batchtools). Implementing the Future API for additional backends is not that hard.

Regarding Docker-based workers: there are some examples how to launch those in ?parallel:makeClusterPSOCK ?future:makeClusterPSOCK.

Absolutely! We can connect future (and by extension, batchtols) and Docker, all in an R-focused way!

I heard that future is taking the R world by storm. I have not yet used it myself, but it sounds ideal here. Do you think any existing parallel backends should be overwritten, or could new ones just be added?

I would love to launch Docker jobs with parallel::makePSOCKcluster() because the code is already set up for this sort of approach.

The changes may not happen immediately, but this is a high priority for the future development of drake.

Cool!

Do you think any existing parallel backends should be overwritten, or could new ones just be added?

I would add support for a "future" option first, and only later when it's shown to work make it the default. With that in place, one could deprecate existing implementations that is already covered by "future", i.e. lapply covered by sequential, mclapply by multicore, parLapply by multisession (the latter two by multiprocess). I'm conservative when it comes to API changes, so I wouldn't do that in a single release. If that was your question.

I should admit I haven't run drake yet, so I'm shooting from the hip here, but it could be that the following will work out of the box (quick tweak of worker_lapply()):

worker_future_lapply <- function(targets, hash_list, config){
  prune_envir(targets = targets, config = config)
  values <- future::future_lapply(X = targets, FUN = build,
    hash_list = hash_list, config = config)
  assign_to_envir(target = targets, value = values, config = config)
}

This of course assumes that build() doesn't modify global variables etc. As long as it's a true functional call it should work.

I agree. For a first step, it seems wise to append a future backend without modifying the other backends. I was just thinking that it would be really slick to totally replace lapply.R, mclapply.R, and parLapply.R with something like worker_future_lapply() with an additional argument to select the backend within future. If future covers the current non-Makefile functionality, the API will stay totally back-compatible: "mclapply" and "parLapply" will still be elements of parallelism_choices() (i.e. possible values of the parallelism argument to make()) but they will accompany synonyms "future", "sequential", "multicore", "multisession", etc. No deprecation required.

In case you are interested and get to this before I do (see #21), I will gladly review pull requests to add a new future.R with run_*() and worker_*() functions, along with additions to parallelism_choices() and possibly documentation.

For anyone else reading this thread who is new to drake's internals, the run_*() functions are called near the end of make(). Since I use get(), appearances of run_lapply(), for example, may be difficult to spot.

Also, global variables are generally protected from build(), but the user could theoretically write workflow commands that insist on modifying globalenv(). I routinely test typical use cases for make(..., envir = globalenv()) vs make(..., envir = some_custom_environment) for both parLapply and Makefile parallelism, and it will be straightforward to see what happens for future.

Also: @emittman and @nachalca are having trouble with the advertised SLURM functionality. I think we could get a Docker image of a SLURM setup and use it to reproduce the setbacks and add an example, maybe a vignette too.

Relevant: #99. I am hoping Docker parallelism turns out to be just Makefile parallelism with the right prepend and recipe_command arguments to make(). That would be super easy and slick.

Historically in Makefile parallelism, targets have always been made with Rscript -e 'drake::mk(...)'. But as of 9904c03ce95f596d2b33b6b9b7d7c182cfb86c8d, you can supply a custom recipe_command argument: for example, make(..., recipe_command = "R -q -e"). Example:

library(drake)
load_basic_example()
# Find "Rscript -e" in the Makefile generated below.
make(my_plan, parallelism = "Makefile", jobs = 4)
# Find "R -q -e" in the Makefile generated below.
make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "R -q -e")

@emittman and @nachalca, do you think recipe_command could also help solve the difficulties you have been having with SLURM? Maybe you could try something like make(..., parallelism = "Makefile", jobs = 2, recipe_command = "sbatch ...").

That sounds like it might work. I am keeping the GPUs I have available busy
for now, but I let you know when/if I get a chance to try it.


Eric Mittman
PhD Student
Statistics Department
Iowa State University
Ames, Iowa

On Thu, Oct 5, 2017 at 10:34 AM, Will Landau notifications@github.com
wrote:

@emittman https://github.com/emittman and @nachalca
https://github.com/nachalca, do you think recipe_command could also
help solve the difficulties you have been having with SLURM? Maybe you
could try something like make(..., parallelism = "Makefile", jobs = 2,
recipe_command = "sbatch ...").

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/wlandau-lilly/drake/issues/42#issuecomment-334503271,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AIHNSYfCA55b8zS1eUrQJ8so4W5FhVBLks5spPbzgaJpZM4Nyu4G
.

Update: there is an optional new R_RECIPE wildcard that tells drake where to put drake::mk(...) in the Makefile recipe. This should allow for more customizability and flexibility. There is also a new Makefile_recipe() function that lets users play around with recipes in advance.

Makefile_recipe()
## Rscript -e 'drake::mk(target = "your_target", cache_path = "/home/landau/.drake")'



md5-64adc1f4b01376096260b64bbcd13b99



```r
## R -e 'drake::mk(target = "this_target", cache_path = "custom_cache")' -q



md5-556857f5c8f4773933397dc62ef5ae23



```r
## R -q -e 'drake::mk(target = "your_target", cache_path = "/home/landau/.drake")'

After some more investigation, and I think it would be unwise to go through Makefile parallelism to leverage Docker as a job scheduler.

My next actions on this issue will be to learn @HenrikBengtsson's future and @mllg's batchtools. Depending on what I learn, I will decide to leverage one or both these packages directly. The eventual solution should completely unlock batchtools, address the retry/timeout functionality that @mrchypark brought up in #76, and hopefully leverage Docker somehow. At a glance, I see that batchtools supports SLURM, which will help stat grad students at Iowa State. @emittman, my day job is taking a turn for the strange, so I may not have time to solve this before you defend and graduate.

I'm here if you need guidance; since I'm not familiar with the inner details of drake I cannot give exact suggestions, but if you already got an mclapply() backend, just try to do a cut'n'paste of that setup replacing mclapply() with future_lapply(). Then all futures "should" work out of the box.

A first test would be to run with plan(multicore), which uses forked processes. A second test would be to run with plan(multisession), which use background R session on the same machine - this adds a bit of challenge when it comes to globals but the future framework automatically takes care of most common use cases and you don't have to worry about manually exporting globals (although that's possible). If you've got a parLapply() backend already implemented, then you can just borrow from there which globals you export. When future_lapply(), or future() calls, is working with the above backends, then plan(future.batchtools::batchtools_local) should work out of the box. After that, testing with one of the HPC schedulers should also work after making sure you have a working batchtools template; plan(future.batchtools::batchtools_torque).

FYI, the Future API does not have a concept of restarting/retrying, but it does forward exceptions of various error classes meaning it provides the basic needs for a "restart" wrapper. It might be that the concept of restart will be added to the Future API at some point, but I'm quite conservative when it comes to adding features to the API since those must work for all backends etc. It could also be that a restart feature better belongs to a high-level API on top of the Future API.

About Docker

In one of my previous comments I had a typo in my reference to ?future::makeClusterPSOCK. It's example contains:

## Setup of Docker worker running rocker/r-base
## (requires installation of future package)
cl <- makeClusterPSOCK(
  "localhost",
  ## Launch Rscript inside Docker container
  rscript = c(
    "docker", "run", "--net=host", "rocker/r-base",
    "Rscript"
  ),
  ## Install future package
  rscript_args = c(
    "-e", shQuote("install.packages('future')")
  ),
  dryrun = TRUE
)

Note how that sets up a regular PSOCK cluster, so nothing specific to futures there (except the illustration on how to a install package on the fly); you can use that with parLapply() if you'd like.

Thank you @HenrikBengtsson, this is exactly what I need to start tinkering! I did not realize that future::plan() could set the backend for future_lapply(). That's a really cool design choice. (I now regret choosing plan() as the name of a prominent drake function.) Would I use future::plan() to tell future_lapply() to use the PSOCK cluster like the cl you defined?

Yes, you can use plan(cluster, workers = cl) where cl is any cluster object. With "cluster" futures, the backend is effectively utilizing the same framework as parallel::parLapply() would do.

So much power for such a small change in the code base! When I get the chance, I will definitely implement future_lapply() as the very next parallel backend extension. If enough future/future.batchtools backends work, I will consider this issue solved. The other backends will stay as they are.

By the way: in prose, how does future::plan() work? Does it set global options? Where does it store the workers?

Yes, it keeps track of it in a "global" part of its local environment, cf. https://github.com/HenrikBengtsson/future/blob/master/R/zzz.plan.R#L78-L85. PS. Note that you can have nested "plans" (strategies), e.g. plan(list(batchjobs_sge, multiprocess)), so that if you "drake" via an SGE scheduler, then each jobs launched will be configured to use plan(multiprocess). If nested strategies are not specified, the default is always plan(sequential).

It just occurred to me: each make(..., parallelism = "parLapply") makes a new PSOCK cluster every time, but a single future::plan(cluster, workers = cl) should work for multiple make()s. This could easily quadruple the speed of the package checks. I cannot tell you how much easier this will make development!

I really like the sound of nested parallelism. #28 is no longer a moonshot!

I started to sketch some code on the 'future' branch of a separate fork. plan(multicore) seems to work mostly okay. Many console messages disappear, but the targets seem to build just fine. On the other hand, plan(multisession) fails. I could fix it easily with future equivalents of clusterCall() and clusterExport(), but they need to generalize beyond just PSOCK clusters. Is there such functionality?

As you anticipated, the trouble is all around global exports. It is the exact same scenario as parLapply parallelism, so I think I understand what to do. I need to export the user's globals to all the workers, and I need to run do_prework(), prune_envir(), and assign_to_envir() on all the workers before starting any real jobs. For efficiency's sake, I have a strong preference for running do_prework() once per make() and running prune_envir() and assign_to_envir() once per future_lapply().

By the way, is there a way to set the maximum number of parallel workers independently of the parallel backend? I am hoping to protect the less computationally-inclined users from accidentally spawning too many jobs and crashing computer systems.

For the record: just like Makefile parallelism, we also should process all the imports locally with the default backend before touching any of the targets. No sense in submitting a separate SLURM or TORQUE job to import every single relevant user-defined function.

I started to sketch some code on the 'future' branch of a separate fork. plan(multicore) seems to work mostly okay. Many console messages disappear, but the targets seem to build just fine. On the other hand, plan(multisession) fails.

If you know what the globals are you can always explicitly export them by specifying globals argument (or future.globals if you use future_lapply()). Ideally the future + globals framework should be able to automatically identify what these globals are, but there are cases where this is very hard or very costly/time consuming. I would need to spend time with drake to give a better answer. Do you have a sense on what globals are missing? If you run with options(future.debug = TRUE) you'll be able to see what globals the future framework finds. Runtime error messages will of course show the first global that is missing.

I could fix it easily with future equivalents of clusterCall() and clusterExport(), but they need to generalize beyond just PSOCK clusters. Is there such functionality?

What you're getting to here is the concept of "persistent" workers. The basics of a _future_ (in programming terms) do not support the concept of "persistence". Futures don't have a memory of the past and when resolved they conceptually turn into a fixed value. Having said this, cluster/multisession futures have an experimental support for persistency, but please note that it is indeed experimental (https://github.com/HenrikBengtsson/future/issues/60) because it only makes sense for certain types of backend. Much more work/thoughts need to go into this concept before.

As you anticipated, the trouble is all around global exports. It is the exact same scenario as parLapply parallelism, so I think I understand what to do. I need to export the user's globals to all the workers, and I need to run do_prework(), prune_envir(), and assign_to_envir() on all the workers before starting any real jobs. For efficiency's sake, I have a strong preference for running do_prework() once per make() and running prune_envir() and assign_to_envir() once per future_lapply().

Putting aside the current drake framework/implementation, do you really need persistent workers? Is it because you pass very large globals, which will be too expensive to pass multiple times? Or is it that it is such an essential part of the drake API/concept? Note that it does not really make sense to think of a persistent worker when you submit tasks as individual jobs on an HPC scheduler.

By the way, is there a way to set the maximum number of parallel workers independently of the parallel backend? I am hoping to protect the less computationally-inclined users from accidentally spawning too many jobs and crashing computer systems.

I believe you're referring to the default number of workers used when specifying plan(multicore), plan(multiprocess), etc. The default is future::availableCores(). It acknowledges several know environment variables and R options. For instance, if you set options(mc.cores = 2L) it will not use more than 2 cores. See also ?future.options, e.g. there are ways for sysadm to protect against overload/mis-usage via site-wide environment variables.

Passing globals and loading packages takes a lot of overhead. I did some tinkering this weekend and confirmed that this is the reason why drake's parLapply backend is unavoidably so much slower than the mclapply one. However, this is a small price to pay to fully embrace the future paradigm and unlock its full power. Now that I know that non-persistence is so important, I will take the time to prepare each worker for each target individually (and process the imports beforehand in a separate step). The implementation will be almost identical to Makefile parallelism, and the behavior will be similar: high-overhead, but able to deploy on a variety of distributed systems. With #76 already solved to the extent that R.utils::withTimeout() allows, I think our bases will be covered.

Good news: I just implemented future_lapply as a drake backend, and it works great on multisession, multicore, and batchtools_local! I already did all the heavy lifting with Makefile parallelism. Will merge soon.

Just one sticking point: For plan(multisession) and plan(batchtools_local), I lose the console output that tells me when I'm building a target. @HenrikBengtsson, is there any way to preserve stdout()?

Remaining things to do before I consider this issue solved:

  • [x] Write a tutorial for the "future_lapply" backend in the parallelism vignette.
  • [x] Demo "future_lapply" backend in the basic example.
  • [x] Add future_lapply to the long testing scenarios with each of the multicore, multisession, and batchtools_local backends, making sure to hit both the local and global environments for each.
  • [x] Deprecate drake::plan() in favor of workflow() due to the name conflict with future::plan().
  • [x] Add .onLoad() tip about the future backends.

Now implemented! @HenrikBengtsson, thank you so much for your generous help!

I document the new "future_lapply" backend in the parallelism vignette. @jarad, @emittman, @nachalca, and @raymondkww, you may be interested in a new way to run things on SLURM. We may want to update @nachalca's SLURM array example if this works.

library(future)
library(future.batchtools)
library(drake)

backend(batchtools_slurm) # same as future::plan()

# Alternatively, plan to send targets to SLURM and then give each target multiple cores.
# This should work if you use `future_lapply()` somewhere within a target's command.
# backend(list(batchtools_slurm, multicore))

# Cap the max jobs with options(mc.cores = 2) or something similar from ?future.options
load_basic_example()
make(my_plan, parallelism = "future_lapply")

Note: for job schedulers, we will need template files like the one for TORQUE. See HenrikBengtsson/future.batchtools#9.

Update: I found a trove of template files at https://github.com/mllg/batchtools/tree/master/inst/templates. I put together built-in quickstart examples for SLURM and the Sun/Univa Grid Engine, but neither is working. I do not have any real experience with batchtools, so any help would be appreciated.

Awesome. Great to hear you made such progress so quickly. Hopefully the road will be smooth for users testing this out. From the perspective of globals, future, future.batchtools, this will add valuable real-world testing to those frameworks.

Another thing: @HenrikBengtsson, in your example of a PSOCK cluster for Docker, what does the dryrun argument do?

Your referring to the example("makeClusterPSOCK", package = "future") where all example snippets use makeClusterPSOCK(..., dryrun = TRUE) (not just the one for Docker). It's there so that the example is actually runnable - dryrun = TRUE does everything but launches the cluster/workers. It's actually most informative if you also use verbose = TRUE at the same time. (Maybe verbose = dryrun should be the default). UPDATE: dryrun=TRUE does indeed output a message.

My last sentence was incorrect; dryrun=TRUE outputs an informative message, e.g.

> cl <- makeClusterPSOCK(2L, dryrun = TRUE)
----------------------------------------------------------------------
Manually start worker #1 on 'localhost' with:
  '/usr/lib/R/bin/Rscript' --default-packages=datasets,utils,grDevices,graphics,stats,methods -e 'parallel:::.slaveRSOCK()' MASTER=localhost PORT=11235 OUT=/dev/null TIMEOUT=2592000 XDR=TRUE
----------------------------------------------------------------------
Manually start worker #2 on 'localhost' with:
  '/usr/lib/R/bin/Rscript' --default-packages=datasets,utils,grDevices,graphics,stats,methods -e 'parallel:::.slaveRSOCK()' MASTER=localhost PORT=11235 OUT=/dev/null TIMEOUT=2592000 XDR=TRUE
> cl
socket cluster with 2 nodes on host 'NULL'

(but OTH I see that the returned "cluster" object contains NULL nodes.)

Thanks, I see. I just realized I mistook future::makeClusterPSOCK() with parallel::makePSOCKcluster(), which is why I did not find dryrun to begin with.

FYI: at some point, future::future_lapply() will move to an upcoming future.apply package (HenrikBengtsson/future#159). When future.apply is on CRAN and all the builds complete, I will make the adjustment to drake.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

kendonB picture kendonB  Â·  29Comments

kendonB picture kendonB  Â·  35Comments

tmastny picture tmastny  Â·  27Comments

wlandau picture wlandau  Â·  30Comments

kendonB picture kendonB  Â·  26Comments