Drake: Manual scheduling

Created on 4 Feb 2018  ·  32Comments  ·  Source: ropensci/drake

  • use future for all parallelism

    • get rid of parLapply() and mclapply()

    • parallelism argument is kept, forwards to the corresponding future implementation with a warning

  • for now, only one job per future, we provide persistence later by running multiple targets in the same future
  • communicating changes to the storr:

    1. job writes directly to the storr

    2. job returns result through the future, master stores result into the storr

  • monitoring progress:

    • watch the storr and

    • check resolved(f)

  • failing workers

    • resolved(f) is TRUE, value(f) throws an error

  • scheduling with a priority queue (https://github.com/DataWookie/liqueueR/issues/2)

    • keys in the pq: number of unresolved upstream targets, need "decrease key" operation

    • as soon as a job finishes, "decrease key" is called for all of its downstream targets

    • a job can be run if key in pq is zero

    • job priorities help resolve ties if pq contains multiple jobs with key 0



      • one option is to use the position in the data frame as a proxy for priority if we don't have explicit priorities



Nomenclature

  • target: the object to compute
  • command/job: the R code that computes a target
  • future: the vehicle that computes one or more commands/jobs on a worker
  • worker: the compute unit that computes futures
advanced priority performance new feature

All 32 comments

Also: let's use lightly_parallelize() as the engine behind make_imports(). For imports, the order of processing does not matter.

If the keys are integers (like in our case), "insert" and "decrease key" can be implemented in O(1).

My main focus right now is this issue. After some sketching (which I will push to a branch as soon as I finish traveling) I think the infrastructure should emerge as an independent “future” backend separate from “future_lapply”. The new scheduler will turn out great in the end, but until we test it over several months, I do not think it should replace any of the existing backends. I think this will be okay because there will not be very much volume of code to add, and I will make sure it is merge-safe.

Key data structures I am planning:

  • R6_worker: a persistent R6 object that can iterate over multiple transitory futures and targets, etc.
  • R6_queue: a trivial queue for targets that we will later turn into a priority queue.

I was wrong about this comment. I forgot that we cannot process the imports in embarrassingly parallel fashion. We need to process them in topological order so that changes to nested functions propagate through any outer functions that depend on them. For this, we should stick to something low-overhead. The current mclapply-powered staged parallelism has worked well for my own projects, and the staging should not cause a bottleneck here.

We need to process them in topological order so that changes to nested functions propagate through any outer functions that depend on them.

Can you give an example where the order is important in make_imports()?

library(drake)

f1 <- function(x){
  f2(x)
}

f2 <- function(x){
  f3(x)
}

f3 <- function(x){
  f4(x)
}

f4 <- function(x){
  x
}

plan <- drake_plan(a = f1(1))
config <- drake_config(plan)
vis_drake_graph(config)

fig pdf

make(plan)

## target a

outdated(config)

## character(0)

f4 <- function(x){
  x + 1
}

outdated(config)

## [1] "a"

Some terms:

  • kernel: the reproducibly-tracked, fingerprinted representation of a target or import.
  • dependency hash: an overarching hash of all the hashes of the kernels of an item's dependencies.

Drake accounts for nesting because the kernel of a function includes its own dependency hash.

For completeness: f1() cannot be processed until f2() through f4() are processed so drake can tell when to update a.

It just occurred to me: just as a first step, what if we used GNU Make to schedule futures? We just need to change the parallelism = "Makefile" backend:

  • Deploy and monitor a single future from the mk() function. Makefile rules are usually of the form Rscript -e drake::mk(...), and there is some customizability.
  • add future::plan("next") to the drake::config() object so the separate R sessions see it. We could eventually extend this to solve #169, but it does not have to happen right away.

This way, we could debug and test the R6_worker class for monitoring individual futures without having to worry about the rest of the manual scheduler. We could also benchmark drake's upcoming manual scheduler against Make and try to outperform it.

The advantage of Make as a scheduler for futures is that parallel workers could simultaneously write to the cache even when remote jobs have no access.

To clarify that last point: requiring the master process to do all the caching is a potential bottleneck. For large computations with small data, this isn't so bad. But a lot of big data work involves small computations on large data, so I think it is worth the time to make this alternative available.

We could also check if or how GNU Make builds a priority queue.

FYI: to do this correctly, I think there is more I need to learn about future. It will take additional time.

Splitting up this issue further would help, I like that idea. Let's focus on the Makefile backend first.

Sounds great. Let's discuss the future-powered Makefile backend in #237.

Some tools with schedulers I might learn from:

  • yarn
  • spark
  • callr
  • processx
  • pkgman
  • pkginstall
  • async

I think I have a promising start on this issue here (i227-attempt2 branch). It still needs a ton of debugging and testing, but it's a start. There are two experimental future-based backends in that branch, each of which deploys one future per target.

  • "future_commands" - Do the caching on the master process. The future should not need to access the master process's file system except to read/write user-defined input/output files.
  • "future_total" - Tell each future to cache its own target.

The rest of the backends are unaffected. We'll see how all the backends compete against each other. Eventually, it may come time to deprecate and remove backends.

One note on monitoring futures in case they crash: I personally think this should be the responsibility of the future package. I have added a comment to https://github.com/HenrikBengtsson/future/issues/172.

I am noticing that future introduces some additional overhead for each target. Maybe with additional callr or processx workers based on the scheduler, we could get rid of the *lapply backends completely. The Makefile backend is still its own special case, unfortunately.

Update: I have been working hard on the i227-attempt2 branch, and I am almost ready to involve more people in alpha testing. I am starting to test it on some of my own old work projects, and things appear to be going rather well so far. Main changes:

  • Add a new parallelism = "future" backend with a manual job scheduler.
  • Throw in a cheap imitation of a priority queue, just as a temporary placeholder. Once https://github.com/dirmeier/datastructures/issues/4 is solved, we can easily swap it out for the Fibonacci heap from Boost. (I recently refreshed my memory of Corman et al chapter 6. The algorithms are easy, but R has a default max recursion depth of 5000.)
  • Add a new caching argument make() and drake_config() to allow the user to select whether the caching happens on the master process or on each of the workers. cc @kendonB.

I implemented the "future"-based scheduler as a separate parallel backend because:

  1. I don't want to risk damaging the other backends.
  2. I want to compare it head-to-head with the staged parallelism backends we already have.
  3. I want to be able merge it into master for easier alpha testing.

I will submit a PR early next week. At that point, I hope we can review the code together.

Note for reference: we might be able to get non-staged mclapply-like functionality by combining mcfork() with the manual scheduler. This might remove enough overhead to make it practical to process imports this way.

Looking back at the summary of our discussion from the RStudio conference:

monitoring progress:

  • watch the storr and
  • check resolved(f)

As I understand it, the purpose of watching the storr is to see if a worker crashed even if resolved(f) is TRUE. Is that right? If so, using the cache would only work for caching = "worker", so I hesitate to implement it right now. For caching = "master", we should probably not assume that workers have cache access.

failing workers

  • resolved(f) is TRUE, value(f) throws an error

Unfortunately does not work for multicore futures at the moment: https://github.com/HenrikBengtsson/future/issues/199. But it does work for multisession parallelism. Just implemented it in #275.

scheduling with a priority queue ...

As I see it, that part hinges on @dirmeier's response to https://github.com/dirmeier/datastructures/issues/4#issuecomment-367548398. If we can use his decrease-key, I will upgrade drake's current priority queue. If not, we might just leave the existing queue be for now. After either eventuality, I will close this issue, and we can work on specific scheduling-related problems in more specific issues.

Yes, watching the storr seems necessary only for caching = "worker". Checking resolved(f) may be expensive on some architectures.

FWIW, I'm still using staged parallelism in my current experiments because of the overhead associated with futures. I like your idea of a pure callr worker, because even future.callr has some overhead that mainly comes from finding the environment in which the globals live (according to a very coarse profiling). Ideally, we'd pass commands to persistent workers that would run them and communicate back, but that feels like a long way to go.

Yeah, if we keep persistent workers, it seems like we either need to have some kind of message passing or make all the workers fight over the next available target. I don't think drake is ready for either yet.

If we had a pure callr worker to process the imports (an optionally targets) we could totally get rid of the mclapply and parLapply backends, and we would no longer need to process targets and imports in two separate stages. (I still think we should keep the Makefile backend even though it complicates the code base.) It seems very doable, but I need to get better at callr.

I do not know how to interact with a callr job like a future, but once I learn and we have that part figured out, I think we should define analogous S3 callr_worker.resolved() and callr_worker.value() methods. That way, we could just add on a callr worker and not need many modifications to R/future.R.

Message passing via file I/O doesn't necessarily mean concurrency if each worker is limited to a namespace (directory) in the file system. But it seems safer to abstract away the message passing part.

I can take care of implementing the callr backend. Would that backend bundle several targets into one callr process?

I would love it if you wrote a callr worker, particularly one that behaves like the existing future worker. Maybe in new_worker(), we could optionally launch callr where a future is launched here?

At the conference we talked about allowing multiple targets per worker, but I think we decided to table that for now. I do not think drake is ready for that yet.

Once we have a callr worker, here is what I am thinking for the next steps:

  1. Allow an optional worker column in the workflow plan data frame, where users can choose between "future" and "callr". There is no sense in deploying an HPC job for a knitr report just because targets leading up to it need SLURM.
  2. Allow targets (via future or callr) and imports (via callr) to be scheduled all together instead of in separate stages.
  3. Remove all parallel backends except "future", "callr", and "Makefile". The "mclapply" and "parLapply" backends will redirect to "callr", and the "future_lapply" backend will redirect to "future".

We'll need to think about how to minimize the time needed to spin up a worker, otherwise the master process may become the bottleneck. I wouldn't deprecate staged parallelism too soon, unless it really impedes future development.

Agreed. I was thinking callr might be lower-overhead than future.

I think it's a good time to close this issue: we already have a good start on the manual scheduler, and further work is starting to split up into a bunch of smaller issues.

Do you want to add these issues now, or maybe as comments to #278?

I think continuing to plan from this thread here is fine. I think the priority queue still needs to wait for dirmeier/datastructures#4, and transitioning imports from mclapply/parLapply to callr depends on how fast the new backend is in #278. There are also other issues like grouping multiple targets into a given worker, but I hesitate to open issues that drake is not ready for.

We'll need to think about how to minimize the time needed to spin up a worker, otherwise the master process may become the bottleneck. I wouldn't deprecate staged parallelism too soon, unless it really impedes future development.

Agreed:

library(callr)
library(parallel)
library(future.callr)
#> Loading required package: future
#> 
#> Attaching package: 'future'
#> The following object is masked from 'package:callr':
#> 
#>     run
library(microbenchmark)

w <- microbenchmark(mclapply(1:2, function(x) {
  print("done")
}), times = 1000)

x <- microbenchmark(mclapply(1:2, function(x) {
  print("done")
}, mc.cores = 2), times = 1000)

y <- microbenchmark(r(function() {
  print("done")
}), times = 1000)

plan(future.callr::callr)

z <- microbenchmark(future(print("done")), times = 1000)

out <- rbind(w, x, y, z)
out
#> Unit: milliseconds
#>                                                            expr        min
#>                mclapply(1:2, function(x) {     print("done") })   2.512824
#>  mclapply(1:2, function(x) {     print("done") }, mc.cores = 2)   2.619827
#>                             r(function() {     print("done") }) 101.140287
#>                                           future(print("done"))   7.744327
#>          lq       mean     median         uq       max neval
#>    2.979621   3.490104   3.094162   3.680197  11.02008  1000
#>    3.049609   3.403416   3.120213   3.509649   9.79317  1000
#>  107.533589 108.844388 108.231943 109.175948 183.09687  1000
#>  208.584417 209.172502 208.961368 209.789184 257.03640  1000
Was this page helpful?
0 / 5 - 0 ratings

Related issues

kendonB picture kendonB  ·  29Comments

wlandau-lilly picture wlandau-lilly  ·  29Comments

wlandau picture wlandau  ·  30Comments

wlandau picture wlandau  ·  45Comments

wlandau picture wlandau  ·  29Comments