I have a Windows work computer and I rely rather heavily on parallel processing in my workflow. It usually involves sampling and then applying a function.
library(doParallel)
#> Loading required package: foreach
#> Loading required package: iterators
#> Loading required package: parallel
library(purrr)
#>
#> Attaching package: 'purrr'
#> The following objects are masked from 'package:foreach':
#>
#> accumulate, when
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
my_n <- 10 # We're gonna be sampling 10 rows each time
cars_list <- mtcars %>%
as_tibble() %>%
list() %>%
rep(16)
cl <- makeCluster(2)
registerDoParallel(cl)
result_list <- foreach (i = 1:20, .packages = c("dplyr", "purrr")) %dopar% {
map2(.x = cars_list, .y = my_n,
.f = ~sample_n(tbl = .x, size = .y, replace = TRUE, weight = wt)) %>%
map(~summarize_at(.,
.vars = vars(mpg, drat),
.funs = median))
}
stopCluster(cl)
Created on 2019-12-09 by the reprex package (v0.3.0)
In reality I'd be doing 15,000 to 100,000 iterations. Which can take everything from 20 minutes to 3 days on 8 cores, depending on the task at hand and the complexity of the function we'd be applying (we're doing tax benefit microsimulation, modeling differing policy rules). For my current drake plans I've been doing this outside of the drake plan, and then having a caching function where I input the dependencies and then read the cached result file - basically just so that it shows up in the depenendy graph.
I've only been using drake for 5 weeks, but seeing how incredibly useful and powerful it is, I'm basically never going back. So now I'm trying to figure out how to implement the above code in a drake plan.
From reading questions on drake parallelism on Windows it seems that something like parLapply would be the way to go, but I haven't gotten that to work.
my_n = 10 # We're gonna be sampling 10 rows each time
cars_list = mtcars %>%
as_tibble() %>%
list() %>%
rep(16)
my_func <- function(x) {
print(x)
map2(.x = cars_list, .y = my_n,
.f = ~sample_n(tbl = .x, size = .y, replace = TRUE, weight = wt)) %>%
map(~summarize_at(.,
.vars = vars(mpg, drat),
.funs = median))
}
cl <- makeCluster(2)
result_list_2 = parLapply(cl, 1:20, fun = my_func(1))
I'll spare you the crazy, giant error message, and I'll just assume that the above code is very silly on my part.
Ideally I'd love to figure out how I can do this with a future backend and furrr, but I have no idea how I'd go about doing that. Silly non-working example:
library(furrr)
#> Loading required package: future
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
my_n <- 10 # We're gonna be sampling 10 rows each time
cars_list <- mtcars %>%
as_tibble() %>%
list() %>%
rep(16)
# What I feel like would be a nice approach (in some backwards world where pmap accepted elements of differing lenghts)
my_function <- function(x, y, z) {
set.seed(z)
sample_n(tbl = x, size = y, replace = TRUE, weight = wt) %>%
summarize_at(.vars = vars(mpg, drat),
.funs = median)
}
list = future_pmap(.l = list(cars_list, my_n, 1:20), .f = ~my_function(..1, ..2, ..3)) #Essentially, I want 20 iterations of this
#> Error: Element 1 has length 16, not 1 or 20.
Created on 2019-12-09 by the reprex package (v0.3.0)
I realise that this is a rather weird question, as it doesn't actually have to do with drake, but rather how I can do other things in a manner that fits inside a drake plan. If this is outside of the scope of questions here within the drake repo, please feel free to delete.
Thanks in advance and thanks for an incredible package.
I'm glad you are starting to use furrr. A couple notes:
future.callr backend with furrr.pmap() and future_pmap() do require all the list elements to be the same length.See below.
library(furrr)
#> Loading required package: future
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(tidyr)
future::plan(future.callr::callr, workers = 2)
cars_list <- mtcars %>%
as_tibble() %>%
list() %>%
rep(16)
my_function <- function(data, size, seed) {
set.seed(seed)
sample_n(tbl = data, size = size, replace = TRUE, weight = wt) %>%
summarize_at(
.vars = vars(mpg, drat),
.funs = median
)
}
wrapper_function <- function(index, size, seed) {
data <- cars_list[[index]]
my_function(data = data, size = size, seed = seed)
}
grid <- expand_grid(
index = seq_along(cars_list),
size = 10,
seed = seq_len(20)
)
list <- future_pmap(.l = grid, .f = wrapper_function)
#> Warning in saveRDS(list(options$func, options$args), file = tmp):
#> 'package:tidyr' may not be available when loading
#> Warning in saveRDS(list(options$func, options$args), file = tmp):
#> 'package:tidyr' may not be available when loading
list[seq_len(2)]
#> [[1]]
#> # A tibble: 1 x 2
#> mpg drat
#> <dbl> <dbl>
#> 1 20.3 3.46
#>
#> [[2]]
#> # A tibble: 1 x 2
#> mpg drat
#> <dbl> <dbl>
#> 1 18.4 3.15
Created on 2019-12-09 by the reprex package (v0.3.0)
Thanks so much for the help @wlandau. I'll admit that I'm a bit out of my element but I'm trying to wrap my head around this level of abstraction and I'm super appreciative of your help. What I'm running into is that the code you supplied fits the bill and executes outside of a drake plan, but for me it's not working inside a call to drake_plan.
```r
my_list failed. Call diagnose(my_list) for details. Error message: What I've tried doing (flailing in the dark essentially)
* !!cars_list
* !!!cars_list
* adding the "cars_list" list of dataframes to the function call through repeating it first in `expand_grid`, but then realizing the crazy amount of overhead to that approach.
Here's the reprex of running it once outside of a drake plan, and once inside.
``` r
library(furrr)
#> Loading required package: future
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(tidyr)
library(drake)
#>
#> Attaching package: 'drake'
#> The following objects are masked from 'package:tidyr':
#>
#> expand, gather
#> The following object is masked from 'package:future':
#>
#> plan
my_function <- function(data, size, seed) {
set.seed(seed)
sample_n(tbl = data, size = size, replace = TRUE, weight = wt) %>%
summarize_at(
.vars = vars(mpg, drat),
.funs = median
)
}
wrapper_function <- function(index, size, seed) {
data <- cars_list[[index]]
my_function(data = data, size = size, seed = seed)
}
# Outside of a drake plan this works fine ---------------------------------
cars_list = mtcars %>%
as_tibble() %>%
list() %>%
rep(16)
grid = expand_grid(
index = seq_along(cars_list),
size = 10,
seed = seq_len(20)
)
my_list = future_pmap(.l = grid, .f = wrapper_function)
# But within a drake plan it fails ----------------------------------------
my_plan = drake_plan(
cars_list = mtcars %>%
as_tibble() %>%
list() %>%
rep(16),
grid = expand_grid(
index = seq_along(cars_list),
size = 10,
seed = seq_len(20)
),
my_list = future_pmap(.l = grid, .f = wrapper_function)
)
make(my_plan)
#> unload targets from environment:
#> cars_list
#> grid
#> my_list
#> target cars_list
#> target wrapper_function
#> target grid
#> target my_list
#> fail my_list
#> Error: Target `my_list` failed. Call `diagnose(my_list)` for details. Error message:
#> object 'cars_list' not found
Created on 2019-12-12 by the reprex package (v0.3.0)
Ah, my mistake. cars_list is a target so it should be an argument to wrapper_function() rather than a global variable dependency. In the latter case, we have an imported function that depends on a target, which is bad. Notice how cars_list points to wrapper_function in the graph. We want to avoid that.
config <- drake_config(my_plan)
vis_drake_graph(config)

This works:
library(furrr)
#> Loading required package: future
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(tidyr)
library(drake)
#>
#> Attaching package: 'drake'
#> The following objects are masked from 'package:tidyr':
#>
#> expand, gather
#> The following object is masked from 'package:future':
#>
#> plan
my_function <- function(data, size, seed) {
set.seed(seed)
sample_n(tbl = data, size = size, replace = TRUE, weight = wt) %>%
summarize_at(
.vars = vars(mpg, drat),
.funs = median
)
}
# add a cars_list argument
wrapper_function <- function(index, size, seed, cars_list) {
data <- cars_list[[index]]
my_function(data = data, size = size, seed = seed)
}
my_plan = drake_plan(
cars_list = mtcars %>%
as_tibble() %>%
list() %>%
rep(16),
grid = expand_grid(
index = seq_along(cars_list),
size = 10,
seed = seq_len(20)
),
my_list = future_pmap(
.l = grid,
.f = wrapper_function,
cars_list = cars_list # supply the cars_list target as an argument.
)
)
make(my_plan)
#> target cars_list
#> target grid
#> target my_list
Created on 2019-12-12 by the reprex package (v0.3.0)
It works! Thanks so much for the speedy reply. You're amazing.
Most helpful comment
I'm glad you are starting to use
furrr. A couple notes:future.callrbackend withfurrr.pmap()andfuture_pmap()do require all the list elements to be the same length.See below.
Created on 2019-12-09 by the reprex package (v0.3.0)