parLapply() and mclapply()resolved(f)resolved(f) is TRUE, value(f) throws an errorAlso: let's use lightly_parallelize() as the engine behind make_imports(). For imports, the order of processing does not matter.
If the keys are integers (like in our case), "insert" and "decrease key" can be implemented in O(1).
My main focus right now is this issue. After some sketching (which I will push to a branch as soon as I finish traveling) I think the infrastructure should emerge as an independent “future” backend separate from “future_lapply”. The new scheduler will turn out great in the end, but until we test it over several months, I do not think it should replace any of the existing backends. I think this will be okay because there will not be very much volume of code to add, and I will make sure it is merge-safe.
Key data structures I am planning:
I was wrong about this comment. I forgot that we cannot process the imports in embarrassingly parallel fashion. We need to process them in topological order so that changes to nested functions propagate through any outer functions that depend on them. For this, we should stick to something low-overhead. The current mclapply-powered staged parallelism has worked well for my own projects, and the staging should not cause a bottleneck here.
We need to process them in topological order so that changes to nested functions propagate through any outer functions that depend on them.
Can you give an example where the order is important in make_imports()?
library(drake)
f1 <- function(x){
f2(x)
}
f2 <- function(x){
f3(x)
}
f3 <- function(x){
f4(x)
}
f4 <- function(x){
x
}
plan <- drake_plan(a = f1(1))
config <- drake_config(plan)
vis_drake_graph(config)

make(plan)
## target a
outdated(config)
## character(0)
f4 <- function(x){
x + 1
}
outdated(config)
## [1] "a"
Some terms:
Drake accounts for nesting because the kernel of a function includes its own dependency hash.
For completeness: f1() cannot be processed until f2() through f4() are processed so drake can tell when to update a.
It just occurred to me: just as a first step, what if we used GNU Make to schedule futures? We just need to change the parallelism = "Makefile" backend:
Makefile rules are usually of the form Rscript -e drake::mk(...), and there is some customizability.future::plan("next") to the drake::config() object so the separate R sessions see it. We could eventually extend this to solve #169, but it does not have to happen right away.This way, we could debug and test the R6_worker class for monitoring individual futures without having to worry about the rest of the manual scheduler. We could also benchmark drake's upcoming manual scheduler against Make and try to outperform it.
The advantage of Make as a scheduler for futures is that parallel workers could simultaneously write to the cache even when remote jobs have no access.
To clarify that last point: requiring the master process to do all the caching is a potential bottleneck. For large computations with small data, this isn't so bad. But a lot of big data work involves small computations on large data, so I think it is worth the time to make this alternative available.
We could also check if or how GNU Make builds a priority queue.
FYI: to do this correctly, I think there is more I need to learn about future. It will take additional time.
Splitting up this issue further would help, I like that idea. Let's focus on the Makefile backend first.
Sounds great. Let's discuss the future-powered Makefile backend in #237.
Some tools with schedulers I might learn from:
yarnsparkcallrprocessxpkgmanpkginstallasyncI think I have a promising start on this issue here (i227-attempt2 branch). It still needs a ton of debugging and testing, but it's a start. There are two experimental future-based backends in that branch, each of which deploys one future per target.
"future_commands" - Do the caching on the master process. The future should not need to access the master process's file system except to read/write user-defined input/output files."future_total" - Tell each future to cache its own target.The rest of the backends are unaffected. We'll see how all the backends compete against each other. Eventually, it may come time to deprecate and remove backends.
One note on monitoring futures in case they crash: I personally think this should be the responsibility of the future package. I have added a comment to https://github.com/HenrikBengtsson/future/issues/172.
Looks more promising: https://github.com/dirmeier/datastructures/blob/master/vignettes/datastructures.Rmd
I am noticing that future introduces some additional overhead for each target. Maybe with additional callr or processx workers based on the scheduler, we could get rid of the *lapply backends completely. The Makefile backend is still its own special case, unfortunately.
Update: I have been working hard on the i227-attempt2 branch, and I am almost ready to involve more people in alpha testing. I am starting to test it on some of my own old work projects, and things appear to be going rather well so far. Main changes:
parallelism = "future" backend with a manual job scheduler.caching argument make() and drake_config() to allow the user to select whether the caching happens on the master process or on each of the workers. cc @kendonB.I implemented the "future"-based scheduler as a separate parallel backend because:
master for easier alpha testing.I will submit a PR early next week. At that point, I hope we can review the code together.
Note for reference: we might be able to get non-staged mclapply-like functionality by combining mcfork() with the manual scheduler. This might remove enough overhead to make it practical to process imports this way.
Looking back at the summary of our discussion from the RStudio conference:
monitoring progress:
- watch the storr and
- check resolved(f)
As I understand it, the purpose of watching the storr is to see if a worker crashed even if resolved(f) is TRUE. Is that right? If so, using the cache would only work for caching = "worker", so I hesitate to implement it right now. For caching = "master", we should probably not assume that workers have cache access.
failing workers
- resolved(f) is TRUE, value(f) throws an error
Unfortunately does not work for multicore futures at the moment: https://github.com/HenrikBengtsson/future/issues/199. But it does work for multisession parallelism. Just implemented it in #275.
scheduling with a priority queue ...
As I see it, that part hinges on @dirmeier's response to https://github.com/dirmeier/datastructures/issues/4#issuecomment-367548398. If we can use his decrease-key, I will upgrade drake's current priority queue. If not, we might just leave the existing queue be for now. After either eventuality, I will close this issue, and we can work on specific scheduling-related problems in more specific issues.
Yes, watching the storr seems necessary only for caching = "worker". Checking resolved(f) may be expensive on some architectures.
FWIW, I'm still using staged parallelism in my current experiments because of the overhead associated with futures. I like your idea of a pure callr worker, because even future.callr has some overhead that mainly comes from finding the environment in which the globals live (according to a very coarse profiling). Ideally, we'd pass commands to persistent workers that would run them and communicate back, but that feels like a long way to go.
Yeah, if we keep persistent workers, it seems like we either need to have some kind of message passing or make all the workers fight over the next available target. I don't think drake is ready for either yet.
If we had a pure callr worker to process the imports (an optionally targets) we could totally get rid of the mclapply and parLapply backends, and we would no longer need to process targets and imports in two separate stages. (I still think we should keep the Makefile backend even though it complicates the code base.) It seems very doable, but I need to get better at callr.
I do not know how to interact with a callr job like a future, but once I learn and we have that part figured out, I think we should define analogous S3 callr_worker.resolved() and callr_worker.value() methods. That way, we could just add on a callr worker and not need many modifications to R/future.R.
Message passing via file I/O doesn't necessarily mean concurrency if each worker is limited to a namespace (directory) in the file system. But it seems safer to abstract away the message passing part.
I can take care of implementing the callr backend. Would that backend bundle several targets into one callr process?
I would love it if you wrote a callr worker, particularly one that behaves like the existing future worker. Maybe in new_worker(), we could optionally launch callr where a future is launched here?
At the conference we talked about allowing multiple targets per worker, but I think we decided to table that for now. I do not think drake is ready for that yet.
Once we have a callr worker, here is what I am thinking for the next steps:
worker column in the workflow plan data frame, where users can choose between "future" and "callr". There is no sense in deploying an HPC job for a knitr report just because targets leading up to it need SLURM.future or callr) and imports (via callr) to be scheduled all together instead of in separate stages. "future", "callr", and "Makefile". The "mclapply" and "parLapply" backends will redirect to "callr", and the "future_lapply" backend will redirect to "future".We'll need to think about how to minimize the time needed to spin up a worker, otherwise the master process may become the bottleneck. I wouldn't deprecate staged parallelism too soon, unless it really impedes future development.
Agreed. I was thinking callr might be lower-overhead than future.
I think it's a good time to close this issue: we already have a good start on the manual scheduler, and further work is starting to split up into a bunch of smaller issues.
Do you want to add these issues now, or maybe as comments to #278?
I think continuing to plan from this thread here is fine. I think the priority queue still needs to wait for dirmeier/datastructures#4, and transitioning imports from mclapply/parLapply to callr depends on how fast the new backend is in #278. There are also other issues like grouping multiple targets into a given worker, but I hesitate to open issues that drake is not ready for.
We'll need to think about how to minimize the time needed to spin up a worker, otherwise the master process may become the bottleneck. I wouldn't deprecate staged parallelism too soon, unless it really impedes future development.
Agreed:
library(callr)
library(parallel)
library(future.callr)
#> Loading required package: future
#>
#> Attaching package: 'future'
#> The following object is masked from 'package:callr':
#>
#> run
library(microbenchmark)
w <- microbenchmark(mclapply(1:2, function(x) {
print("done")
}), times = 1000)
x <- microbenchmark(mclapply(1:2, function(x) {
print("done")
}, mc.cores = 2), times = 1000)
y <- microbenchmark(r(function() {
print("done")
}), times = 1000)
plan(future.callr::callr)
z <- microbenchmark(future(print("done")), times = 1000)
out <- rbind(w, x, y, z)
out
#> Unit: milliseconds
#> expr min
#> mclapply(1:2, function(x) { print("done") }) 2.512824
#> mclapply(1:2, function(x) { print("done") }, mc.cores = 2) 2.619827
#> r(function() { print("done") }) 101.140287
#> future(print("done")) 7.744327
#> lq mean median uq max neval
#> 2.979621 3.490104 3.094162 3.680197 11.02008 1000
#> 3.049609 3.403416 3.120213 3.509649 9.79317 1000
#> 107.533589 108.844388 108.231943 109.175948 183.09687 1000
#> 208.584417 209.172502 208.961368 209.789184 257.03640 1000