Drake: Customizing cluster templates

Created on 29 Nov 2019  路  17Comments  路  Source: ropensci/drake

Prework

  • [X ] Read and abide by drake's code of conduct.
  • [ X] Search for duplicates among the existing issues, both open and closed.
  • [ X] If you think your question has a quick and definite answer, consider posting to Stack Overflow under the drake-r-package tag. (If you anticipate extended follow-up and discussion, you are already in the right place!)

Question

Hello, first of all, thanks for the awesome package! I've recently started using it and find it really encourages good practices (limiting run time, creating functions, enhancing reproducibility and just general organization). I am excited to implement it into my regular workflow. I use a slurm cluster and was just wondering if there is a way to customize the templates that are provided by clustermq using the template flag other than those in {{ }} ? For example changing the ntasks parameter? Perhaps this a question for clustermq developers.

Thanks in advance for your help,

question

Most helpful comment

Hello,
Thanks a lot and sorry for the delay! I managed to assign a certain number of cores using clustermq by modifying my template file by adding the line #SBATCH --cpus-per-task={{ cpus }} and then running as follows:

make(my_plan,parallelism = "clustermq",jobs = 2, console_log_file = "file.out", template = list(cpus = 16, partition = "cluster_i_want"))

Note that I also added the line #SBATCH --partition={{ partition }} to specify a specific partition

But there is a caveat to this which is exactly as you said before, this is a waste of resources if not all jobs require this many cpus. So after doing a bit more reading of the manual I think the solution involving transient workers is much better/nicer using batchtools. For this I guess I can do something similar but with editing the batchtools template created by:

drake_hpc_template_file("slurm_batchtools.tmpl") 

It looks like this template already has a placeholder for cpus-per-task so there is nothing to edit. I do however, want to modify the template such that I can specify a partition. To do this I guess I can add the following line:
#SBATCH --partition=<%= resources$partition %>

Following on your example in 12.7.5 (except with slurm instead of sge), could I then specify some of these resources as target specific and some to the whole plan? As follows:

## Define the plan with different cores/target
plan <- drake_plan(
  data = target(
    download_data(),
    resources = list(ncpus = 1)
  ),
  model = target(
    big_machine_learning_model(data),
    resources = list(ncpus = 4)
  )
)
## register the template
future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl") ##edited as described
## make with certain partition
make(plan, parallelism = "future", jobs = 2, resources = list( partition = "partition_i_want" )

So basically I want to specify a partition that applies everywhere, but cores to be target dependent. I could include the partition argument for target but that seems unnecessary.

I imagine something like this will work and I'll try it now and provide an update - leaving this all here in case it's useful for someone else!

All 17 comments

Thanks for the encouragement, @jennysjaarda.

I recommend creating new placeholders in your clustermq template file as you see fit. Here is an example where I define a custom n_slots placeholder and then fill it in with the template argument of make(). Special placeholders like n_jobs and job_name are probably reserved by clustermq itself. Follow-up about special reserved placeholders is, as you say, probably a topic for the clustermq issue tracker.

Where do you see ntasks specifically? If you mean n_jobs as it appears in this template file, we are talking about the number of clustermq workers, which is controlled with the jobs argument of make().

By the way, drake has a bunch of example template files too, and drake_hpc_template_file("slurm_clustermq.tmpl") will write this one to your file system.

Thanks for the quick response. I've seen the template files in drake that you've mentioned and this is what I've started experimenting with. I guess I am a bit confused why the default parallelization flag uses the --array flag rather than --ntasks (from here)? I used the array flag often (before my experience with drake) and my SBATCH script looked a bit different from the template, apologies if I'm missing something obvious, but this is how I envision a script with an array flag:

#!/bin/sh
#SBATCH --job-name=myjob
#SBATCH --output=myoutput
... ### other flags as necessary 
#SBATCH --array=1-{{ n_jobs }}

Rscript myscript.r $SLURM_ARRAY_TASK_ID 

However what I'm trying to do is launch one job with many threads. If I was to manually set this up on slurm I would have thought to use the ntasks flag. Specifically, what I'm doing is using the run command from processx to launch a program from the command line which has a flags parameter (I want to specify X# of threads). Do you think it's possible? I have a workflow similar to the one described in the (FAQ/issue #277 ), but with the majority of the scripts I write being in R (but using some non-R programs that are developed by other groups and launched from the command line). I am most comfortable in R so I really like the idea of keeping everything within the R language.

By design, clustermq always uses job arrays so that all the workers get submitted at the same time (and thus start sooner, I think). So all clustermq template files are required to use job arrays instead of --ntasks.

Since threads exist within jobs/workers, it should absolutely be possible to invoke a multithreaded command inside a target. But if those thread require multiple cores or CPUs to be truly effective, those extra resource requests are going to apply to all the tasks in the job array. So instead of homogeneous persistent workers, which could waste resources in some workflows, it might be worth considering transient workers so you can request different resources for different targets. By the way, for targets that do not need SLURM, you can set them to run them locally with drake_plan(quick_target = target(your_command, hpc = FALSE)), which ends up being quicker (low overhead) for small targets.

I think I answered your questions, but let me know if anything else comes up.

Hello,
Thanks a lot and sorry for the delay! I managed to assign a certain number of cores using clustermq by modifying my template file by adding the line #SBATCH --cpus-per-task={{ cpus }} and then running as follows:

make(my_plan,parallelism = "clustermq",jobs = 2, console_log_file = "file.out", template = list(cpus = 16, partition = "cluster_i_want"))

Note that I also added the line #SBATCH --partition={{ partition }} to specify a specific partition

But there is a caveat to this which is exactly as you said before, this is a waste of resources if not all jobs require this many cpus. So after doing a bit more reading of the manual I think the solution involving transient workers is much better/nicer using batchtools. For this I guess I can do something similar but with editing the batchtools template created by:

drake_hpc_template_file("slurm_batchtools.tmpl") 

It looks like this template already has a placeholder for cpus-per-task so there is nothing to edit. I do however, want to modify the template such that I can specify a partition. To do this I guess I can add the following line:
#SBATCH --partition=<%= resources$partition %>

Following on your example in 12.7.5 (except with slurm instead of sge), could I then specify some of these resources as target specific and some to the whole plan? As follows:

## Define the plan with different cores/target
plan <- drake_plan(
  data = target(
    download_data(),
    resources = list(ncpus = 1)
  ),
  model = target(
    big_machine_learning_model(data),
    resources = list(ncpus = 4)
  )
)
## register the template
future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl") ##edited as described
## make with certain partition
make(plan, parallelism = "future", jobs = 2, resources = list( partition = "partition_i_want" )

So basically I want to specify a partition that applies everywhere, but cores to be target dependent. I could include the partition argument for target but that seems unnecessary.

I imagine something like this will work and I'll try it now and provide an update - leaving this all here in case it's useful for someone else!

Excellent! Yes, this is exactly how I recommend using the template. Please let me know how it goes.

Just one thing to watch out for: parallelism = "future" has more overhead because the workers are transient and the backend is batchtools instead of clustermq. (future itself is very fast on its own.)

Thanks for the tip, admittedly I don't understand all the background processes, but if you get a clustermq backend working that accommodates transient workers, I will happily switch!

I could not get it running as I stated above, using the drake template file. I think there may be a typo in the slurm_batchtools.tmpl? Surprising to me as I know a lot of work has been put into the example, but this is what I get when I run the example from drake_example("slurm")

drake_hpc_template_file("slurm_batchtools.tmpl")

plan <- drake_plan(
  data = target(
    download_data(),
    resources = list(cores = 1, gpus = 0)
  ),
  model = target(
    big_machine_learning_model(data),
    resources = list(cores = 4, gpus = 1)
  )
)

library(drake)
library(future.batchtools)
future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl")

make(plan, parallelism = "future", jobs = 2)

output:


target data
Error: Error brewing template: Error in parse(text = code, srcfile = NULL) : 18:42: unexpected ')'
17: .brew.cat(20,21)
18: cat( if (!is.null(resources$walltime)) { )
                                             ^

I really don't know anything about the brew syntax so I wasn't too successful at debugging, but I was able to get a modified version of the template here working.

Glad you found something that worked. I do not have access to a SLURM cluster, so I cannot test those template files myself. Would you be willing to submit a pull request to repair https://github.com/ropensci/drake/blob/master/inst/templates/hpc/slurm_batchtools.tmpl?

Yes I will do that. But now I'm wondering if I may have an issue that is specific to drake, I'm really not sure. I've created a template file very similar to the way the sge one was in the manual, but for slurm, as follows:

#!/bin/bash

#SBATCH --job-name=<%= job.name %>
#SBATCH --output=<%= log.file %>
#SBATCH --error=<%= log.file %>
#SBATCH --ntasks=1
#SBATCH --partition=<%= resources[["partition"]] %>
#SBATCH --cpus-per-task=<%= resources[["cores"]] %>
#SBATCH --account=<%= resources[["account"]] %>

<%= if (array.jobs) sprintf("#SBATCH --array=1-%i", nrow(jobs)) else "" %>
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'

Previously, I had no experience with brew, but it seems right from that perspective. However, I get the error:

Error: Fatal error occurred: 101. Command 'sbatch' produced exit code 1. Output: 'sbatch: error: Invalid numeric value "" for cpus-per-task.'

When I check the out file I can see that the arguments from resources are not being passed into the sbatch with brew at all. I.e. there is nothing listed for --cpus-per-task task flag. Any ideas?

my plan and make functions are as follows:

init_analysis <- drake_plan(
  # run  ---------------------------
  linear_out = target(
    {
      run_final_processing
      run_gwas(pfile = ("analysis/QC/15_final_processing/FULL/PSYMETAB_GWAS.FULL"), pheno_file = file_in("data/processed/phenotype_data/GWAS_input/pheno_input.txt"),
               covar_file = file_in("data/processed/phenotype_data/GWAS_input/covar_input.txt"),
               threads = 16, pheno_name = pheno, covar_names = covars, eths = !!eths,
               eth_sample_file = "analysis/QC/12_ethnicity_admixture/pca/PSYMETAB_GWAS_ETH_samples.txt", ## this is not a real file - "ETH" gets replaced by proper "ETH" in `run_gwas`
               remove_sample_file = "analysis/QC/11_relatedness/PSYMETAB_GWAS_related_ids.txt",
               output_dir = file_in("analysis/GWAS"), output = "PSYMETAB_GWAS")},
    transform = map(.data = !!baseline_gwas_info, .id = pheno),resources = list(cores = 16,partition="cluster2")),

  interaction_out = target({
    run_final_processing
    run_gwas(pfile = "analysis/QC/15_final_processing/FULL/PSYMETAB_GWAS.FULL", pheno_file = file_in("data/processed/phenotype_data/GWAS_input/pheno_input.txt"),
                covar_file = file_in("data/processed/phenotype_data/GWAS_input/covar_input.txt"),
                threads = 16, pheno_name = pheno, covar_names = covars, parameters = parameters, interaction = TRUE,
                eths = !!eths, eth_sample_file = "analysis/QC/12_ethnicity_admixture/pca/PSYMETAB_GWAS_ETH_samples.txt",  ## this is not a real file - "ETH" gets replaced by proper "ETH" in `run_gwas`
                remove_sample_file = "analysis/QC/11_relatedness/PSYMETAB_GWAS_related_ids.txt",
                output_dir = file_out("analysis/GWAS"), output = "PSYMETAB_GWAS")},
    transform = map(.data = !!interaction_gwas_info, .id = pheno),resources = list(cores = 16,partition="cluster2")),
  subgroup_out = target({
    run_final_processing
    run_gwas(pfile = "analysis/QC/15_final_processing/FULL/PSYMETAB_GWAS.FULL", pheno_file = file_in("data/processed/phenotype_data/GWAS_input/pheno_input.txt"),
                covar_file = file_in("data/processed/phenotype_data/GWAS_input/covar_input.txt"),
                threads = 16, pheno_name = pheno, covar_names = covars, subgroup_var = subgroup, group = "subgroup",
                eths = !!eths, eth_sample_file = "analysis/QC/12_ethnicity_admixture/pca/PSYMETAB_GWAS_ETH_samples.txt",  ## this is not a real file - "ETH" gets replaced by proper "ETH" in `run_gwas`
                remove_sample_file = "analysis/QC/11_relatedness/PSYMETAB_GWAS_related_ids.txt",
                output_dir = file_out("analysis/GWAS"), output = "PSYMETAB_GWAS")},
    transform = map(.data = !!subgroup_gwas_info, .id = pheno),resources = list(cores = 16,partition="cluster2"))
)

make(init_analysis,parallelism = "future",jobs = 4, console_log_file = "init.out")

By the way, the link in your previous is broken.

What happens if you run future.batchtools with that template file without drake? Do you get the same error?

library(future)
library(future.batchtools)
future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl")
f <- future(1 + 1)
value(f)

Also, I fixed the link from https://github.com/ropensci/drake/issues/1083#issuecomment-565436680.

The future.batchtools works fine... and I tried changing the resources and it seems to be getting passed to brew when I look at the generated slurm command (sbatch) file. I.e. these both work:

library(future)
library(future.batchtools)
future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl")
f <- future(1 + 1)
value(f)

future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl", resources= list(ncpus=2))
f <- future(1 + 1)
value(f)

Where the first will take the default ncpus and the second will take ncpus as provided by resources. I also tried with the file I listed explicitly above with cores defined and that also works. So perhaps this is a drake problem?

Hmm... I actually asked you to test the wrong thing. I remember now that drake supplies the resources directly to future().

https://github.com/ropensci/drake/blob/b21fb90fda910ac64e41352e7231f4d9a15d33c6/R/backend_future.R#L107

Would you try this?

library(future)
library(future.batchtools)
future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl")
f <- future(1 + 1, resources = list(cores = 2))
value(f)

where slurm_batchtools.tmpl is as before.

#!/bin/bash

#SBATCH --job-name=<%= job.name %>
#SBATCH --output=<%= log.file %>
#SBATCH --error=<%= log.file %>
#SBATCH --ntasks=1
#SBATCH --partition=<%= resources[["partition"]] %>
#SBATCH --cpus-per-task=<%= resources[["cores"]] %>
#SBATCH --account=<%= resources[["account"]] %>

<%= if (array.jobs) sprintf("#SBATCH --array=1-%i", nrow(jobs)) else "" %>
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'

That also seems to work (just modified according to my partition/account)
f <- future(1 + 1, resources= list(cores = 2, partition="mypartition", account="myaccount"))

Turns out it was a problem in drake. I reproduced it on an SGE cluster and fixed it in aebbaf250a5615c4f4efbaa9f066e2d1834bfd05.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

billdenney picture billdenney  路  9Comments

rsangole picture rsangole  路  3Comments

tiernanmartin picture tiernanmartin  路  3Comments

htlin picture htlin  路  4Comments

maelle picture maelle  路  8Comments