drake's code of conduct.remotes::install_github("ropensci/drake")) and mention the SHA-1 hash of the Git commit you install. (installed from github a few days ago)I ran a SLURM job with 2000 out-of-date targets, 100 workers, and clustermq parallelism. The workers were small and appeared very quickly today (great!) but I'm worried that the master process got overwhelmed somehow with so many workers sitting there. It took a full 7 minutes (i.e 700 worker-minutes) for the first target to get assigned.
This one is obviously tough to reproduce. I will try to investigate.
It looks like the bottleneck is in the 101st call to config$workers$receive_data() here:
https://github.com/mschubert/clustermq/blob/dd101c1ee6ca92c29a9e87894ed3876bc753e0fd/R/qsys.r#L103-L104
this polling takes a long time with a lot of workers which may be unavoidable, and doesn't seem to be drake's problem.
This is useful to know @kendonB, thanks for the report. cc @jeroen, @armstrtw, @mschubert.
I'm afraid I can't say anything without more information about the jobs, and ideally showing the same behaviour without drake involved.
The line you reference just means "waiting for results", i.e. the master is not doing anything. This should scale to 1000s of peers, and in my tests I could use 300+ workers without issues.
If the master was getting overwhelmed by too many workers I'd also expect most time spent in serialize/unserialize/other processing (R code; I do reach this limit sometimes with the master using a full core on 100% CPU), but not just waiting (polling itself is high performance).
@wlandau, am I correct that the 101st call would be the first real computation (after distributing the drake config object)?
@kendonB, do you have the worker log for the 101st call?
@wlandau, am I correct that the 101st call would be the first real computation (after distributing the drake config object)?
Hmm... seems like that 101st call could potentially be just prior to sending the first computation. Here is what I get when I put a browser statement just after receiving the message and used only one worker.
> options(clustermq.scheduler = "multicore", drake_make_menu = FALSE)
> plan <- drake_plan(a = 1, b = 2, c = 3)
> make(plan, parallelism = "clustermq")
Submitting 1 worker jobs (ID: 6145) ...
Called from: cmq_master(config)
Browse[1]> str(msg)
List of 4
$ id : chr "WORKER_READY"
$ auth : chr ""
$ pkgver:Classes 'package_version', 'numeric_version' hidden list of 1
..$ : int [1:3] 0 8 6
$ token : chr "not set"
Browse[1]> c
Called from: cmq_master(config)
Browse[1]> str(msg)
List of 3
$ id : chr "WORKER_READY"
$ auth : chr ""
$ token: chr "set_common_data_token"
Browse[1]> c
target a
Called from: cmq_master(config)
Browse[1]> str(msg)
List of 5
$ id : chr "WORKER_READY"
$ auth : chr ""
$ token : chr "set_common_data_token"
$ ref : language drake::cmq_build(target = target, meta = meta, deps = deps, config = config)
$ result:List of 4
..$ target : chr "a"
..$ meta :List of 14
.. ..$ name : chr "a"
.. ..$ target : chr "a"
.. ..$ imported : logi FALSE
.. ..$ missing : logi TRUE
.. ..$ seed : num 1.67e+09
.. ..$ time_start : 'proc_time' Named num [1:5] 2.461 0.435 132.559 0.158 0.114
.. .. ..- attr(*, "names")= chr [1:5] "user.self" "sys.self" "elapsed" "user.child" ...
.. ..$ file_out : NULL
.. ..$ isfile : logi FALSE
.. ..$ trigger :List of 6
.. .. ..$ command : logi TRUE
.. .. ..$ depend : logi TRUE
.. .. ..$ file : logi TRUE
.. .. ..$ condition: logi FALSE
.. .. ..$ change : NULL
.. .. ..$ mode : chr "whitelist"
.. ..$ command : chr "1"
.. ..$ dependency_hash : chr ""
.. ..$ input_file_hash : chr ""
.. ..$ output_file_hash: chr ""
.. ..$ time_command : 'proc_time' Named num [1:5] 0 0 0 0 0
.. .. ..- attr(*, "names")= chr [1:5] "user.self" "sys.self" "elapsed" "user.child" ...
..$ value : num 1
..$ checksum: chr "ef46db3751d8e999"
Browse[1]>
But I am not sure I can really know happened in that 101st $receive_data() call. Isn't it possible that some workers will be ready accept targets before others can even send a "not set" token? Maybe msg$result$value was just large and difficult to send over the socket (admittedly unlikely if no targets even started building at that point). But @kendonB, on the off chance that this explains the slowdown, have you tried make(parallelism = "clustermq", caching = "worker")? I seem to recall that this was a more appealing option for you anyway.
Isn't it possible that some workers will be ready accept targets before others can even send a "not set" token?
If I remember correctly, you sent drake's config object to each worker as a first step (between your first and your second message)?
If all workers start up near-simultaneously (which I gathered was the case from @kendonB's initial report) that would (likely) cause the 101st to be the first computation config object answer (your 2nd message in your example).
edit: first config answer, not computation
If I remember correctly, you sent drake's config object to each worker as a first step (between your first and your second message)?
Yes, plus everything in the global environment if config$envir is globalenv().
If all workers start up near-simultaneously (which I gathered was the case from @kendonB's initial report) that would (likely) cause the 101st to be the first config object answer (your 2nd message in your example).
Makes sense, I just was not sure what to assume about the order in which the messages come back.
Yes, plus everything in the global environment
Would be good to know the size of this object. This may be the issue here.
But @kendonB, on the off chance that this explains the slowdown, have you tried make(parallelism = "clustermq", caching = "worker")
Apologies for not sending my full call. I already had caching = "worker" on. This was my full call to make:
make(reading_splitting_intersection,
verbose = 4,
jobs = 100,
jobs_preprocess = 32,
packages = c("sf", "tidyverse"),
parallelism = "clustermq",
caching = "worker",
memory_strategy = "lookahead",
template = list(memory = "3G",
# minutes
walltime = 15,
log_file = "make.log"),
console_log_file = NULL
)
@kendonB, do you have the worker log for the 101st call?
If they're all up at once, is this guaranteed to be worker "1"? If so, this is the top of the log for that one with jobs = 25. There was still a similar delay but scaled down by approx 1/4. The delay thankfully appears to be linear, though that means the CPU time wasted is exponential. The delay in the log is indicated.
Master: tcp://mahuika01:6497
WORKER_UP to: tcp://mahuika01:6497
#####################
### Delay is here ###
#####################
> DO_SETUP (1.229s wait)
token from msg: set_common_data_token
> DO_CALL (0.001s wait)
Linking to GEOS 3.7.1, GDAL 2.4.0, PROJ 5.2.0
โโ Attaching packages โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ tidyverse 1.2.1 โโ
โ ggplot2 3.1.0 โ purrr 0.3.0
โ tibble 2.0.1 โ dplyr 0.7.99.9000
โ tidyr 0.8.2 โ stringr 1.4.0
โ readr 1.3.0 โ forcats 0.4.0
โโ Conflicts โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ tidyverse_conflicts() โโ
โ tidyr::expand() masks drake::expand()
โ dplyr::filter() masks stats::filter()
โ tidyr::gather() masks drake::gather()
โ dplyr::lag() masks stats::lag()
eval'd: drake::cmq_buildtargetmetadepsconfig
> DO_CALL (0.000s wait)
unload 1 item: intersected2_765
eval'd: drake::cmq_buildtargetmetadepsconfig
> DO_CALL (0.000s wait)
unload 1 item: intersected2_774
This looks very much like it was caused by sending a large config object.
I have added a log entry for the data size now to make it easier to check for this.
@kendonB, if you already have a trimmed-down global environment, maybe there is something drake can do to reduce the size of config. It would be nice to see the output of
purrr::map_dbl(config, object.size) %>%
sort(decreasing = TRUE) %>%
tibble::tibble(id = names(.), size = .)
purrr::map_dbl(config, pryr::object_size) %>%
+ sort(decreasing = TRUE) %>%
+ tibble::tibble(id = names(.), size = .)
# A tibble: 46 x 2
id size
<chr> <dbl>
1 sleep 68733632
2 cache 54741056
3 layout 52997680
4 plan 14136368
5 graph 1983632
6 schedule 1918728
7 targets 1036016
8 imports 2784
9 trigger 992
10 template 744
and yes, @wlandau, the plan itself is by far the biggest item in the environment.
Perhaps a solution is to always also send a target with the config in that first round?
sleep be so large? Could you reassign its closure to a smaller environment?drake should call config$cache$flush_cache() before sending data (see ae6410d5425558d2aa1e1d25f696ae721745f93f).config$envir is globalenv(), perhaps you could rm(plan) and then call make(config = config). Also, what are the object sizes of the various columns in the plan?Could you reassign its closure to a smaller environment?
How do I do this?
I wonder where else the plan is duplicated in memory. If config$envir is globalenv(), perhaps you could rm(plan) and then call make(config = config). Also, what are the object sizes of the various columns in the plan?
I measured the size of config by just using debug(make) and measuring it while debugging. I also tried generating config in the global environment then rm(plan) then make(config = config). No major change in behavior.
How do I do this?
Try environment(config$sleep) <- baseenv().
I measured the size of config by just using debug(make) and measuring it while debugging. I also tried generating config in the global environment then rm(make) then make(config = config). No major change in behavior.
Assuming you mean rm(plan), we still have options. If you use a non-global environment for config$envir, it might help.
envir <- new.env(parent = globalenv())
source("R/packages.R", local = envir)
source("R/functions.R", local = envir)
source("R/plan.R", local = envir)
options(clustermq.scheduler = "slurm", template = "your.tmpl")
make(plan, envir = envir, parallelism = "clustermq", jobs = 100)
In the meantime, I will see if we can get away with sending a downsized config to the workers instead of the whole thing. In particular, I do not think we need config$plan if we already have config$layout.
We probably do not need to send config$graph, config$schedule, or config$targets either.
@kendonB, please try #804. I think it reduces a lot of the data that gets sent to HPC workers.
Big improvement in the version with 100 workers. Started sending targets within ~3 minutes instead of 7. With 25 workers it was 64 seconds instead of ~3 mins.
How do I do this?
Try
environment(config$sleep) <- baseenv().I measured the size of config by just using debug(make) and measuring it while debugging. I also tried generating config in the global environment then rm(make) then make(config = config). No major change in behavior.
Assuming you mean
rm(plan), we still have options. If you use a non-global environment forconfig$envir, it might help.envir <- new.env(parent = globalenv()) source("R/packages.R", local = envir) source("R/functions.R", local = envir) source("R/plan.R", local = envir) options(clustermq.scheduler = "slurm", template = "your.tmpl") make(plan, envir = envir, parallelism = "clustermq", jobs = 100)In the meantime, I will see if we can get away with sending a downsized config to the workers instead of the whole thing. In particular, I do not think we need
config$planif we already haveconfig$layout.
Is the above still relevant?
Big improvement in the version with 100 workers. Started sending targets within ~3 minutes instead of 7. With 25 workers it was 64 seconds instead of ~3 mins.
Fantastic! What does the memory profile of drake:::cmq_config(config) look like for you?
Is the above still relevant?
I believe so. If you avoid the global environment, less data will be duplicated.
By the way: in https://github.com/ropensci/drake/issues/800#issuecomment-477626251, I do not think we should construct the plan in the local environment since it appears to be taking up so much memory:
envir <- new.env(parent = globalenv())
source("R/packages.R", local = envir)
source("R/functions.R", local = envir)
source("R/plan.R") # Use the global env here assuming `plan.R` only constructs the plan (no new pkgs or functions).
options(clustermq.scheduler = "slurm", template = "your.tmpl")
make(plan, envir = envir, parallelism = "clustermq", jobs = 100)
If your plan needs some of the functions etc. from other scripts, I guess we could source("R/plan.R", local = envir) as long as you delete the plan and other large superfluous objects from envir before make().
Another approach is to convert the commands in the plan to character strings: plan$command <- purrr::map_chr(plan$command, drake:::safe_deparse). drake will still understand the plan. When I started using language objects instead of character strings, it did not even occur to me that memory was going to blow up.
> pryr::object_size(drake:::cmq_config(config))
96.8 MB
Hmm... that's still quite large. What are the sizes of the individual elements? Is config$envir still the global environment?
@kendonB, after 6d28b4945424a5c498ef780ac72f7e2c29644e9f, is pryr::object_size(cmq_config(config)$cache) still large?
I think I have reduced this issue down to a bunch of smaller ones: #812, #815, #816, #817. Closing.