In response to the prior discussion on stackoverflow, I decided to test the impact of parallel processing on aggregation--which occurs frequently in my work
I've done some tests per @mattdowle 's prior mantra of Rprof, Rprof, Rprof.
What I find is that the decision to parallelize is context dependent; but is likely significant. Depending on the test operations (eg foo below, which can be customized) and the number of cores utilized (I try both 8 and 24), I get different results.
I also look at some real-world (non shareable) data / operations which shows a larger (33% or 25%, two different tests) improvement paralellizing with 24 cores. Edit May 2018 A new set of real-world example cases are showing closer to 85% improvements from parallel operations with 1000 groups.
R> sessionInfo() # 24 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods
[8] base
other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.2 data.table_1.10.4
R> sessionInfo() # 8 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: macOS Sierra 10.12.4
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods base
other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.5 data.table_1.10.4
library(data.table)
library(stringi)
library(microbenchmark)
set.seed(7623452L)
my_grps <- stringi::stri_rand_strings(n= 5000, length= 10)
my_mat <- matrix(rnorm(1e5), ncol= 20)
dt <- data.table(grps= rep(my_grps, each= 20), my_mat)
foo <- function(dt) {
dt2 <- dt ## needed for .SD lock
nr <- nrow(dt2)
idx <- sample.int(nr, 1, replace=FALSE)
dt2[idx,][, `:=` (
new_var1= V1 / V2,
new_var2= V4 * V3 / V10,
new_var3= sum(V12),
new_var4= ifelse(V10 > 0, V11 / V13, 1),
new_var5= ifelse(V9 < 0, V8 / V18, 1)
)]
return(dt2[idx,])
}
split_df <- function(d, var) {
base::split(d, get(var, as.environment(d)))
}
foo2 <- function(dt) {
dt2 <- split_df(dt, "grps")
require(parallel)
cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
clusterExport(cl, varlist= "foo")
clusterExport(cl, varlist= "dt2", envir = environment())
clusterEvalQ(cl, library("data.table"))
dt2 <- parallel::parLapply(cl, X= dt2, fun= foo)
parallel::stopCluster(cl)
return(rbindlist(dt2))
}
print(parallel::detectCores()) # 8
microbenchmark(
serial= dt[,foo(.SD), by= "grps"],
parallel= foo2(dt),
times= 10L
)
Unit: seconds
expr min lq mean median uq max neval cld
serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387 10 b
parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257 10 a
print(parallel::detectCores()) # 24
Unit: seconds
expr min lq mean median uq max neval cld
serial 9.014247 9.804112 12.17843 13.17508 13.56914 14.13133 10 a
parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353 10 a
We can use this answer to provide a more direct response to @mattdowle 's original comment to profiling.
As a result, we do see that the majority of compute time is handled by base and not data.table. data.table operations themselves are, as expected, exceptionally fast. While some might argue that this is evidence that there is no need for parallelism within data.table, I posit that this workflow/operation-set is not atypical. That is, it is my strong suspicion that the majority of large data.table aggregation involve a substantial amount of non-data.table code; and that this is correlated with interactive use vs development / production use. I therefore conclude that parallelism would be valuable within data.table for large aggregations.
library(profr)
prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002),
simplify = FALSE)
pkg_timing <- fun_timing <- vector("list", length= 100)
for (i in 1:100) {
fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum)
pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum)
}
sort(sapply(fun_timing, sum)) # no large outliers
fun_timing2 <- rbindlist(lapply(fun_timing, function(x) {
ret <- data.table(fun= names(x), time= x)
ret[, pct_time := time / sum(time)]
return(ret)
}))
pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) {
ret <- data.table(pkg= names(x), time= x)
ret[, pct_time := time / sum(time)]
return(ret)
}))
fun_timing2[, .(total_time= sum(time),
avg_time= mean(time),
avg_pct= round(mean(pct_time), 4)), by= "fun"][
order(avg_time, decreasing = TRUE),][1:10,]
pkg_timing2[, .(total_time= sum(time),
avg_time= mean(time),
avg_pct= round(mean(pct_time), 4)), by= "pkg"][
order(avg_time, decreasing = TRUE),]
Results:
fun total_time avg_time avg_pct
1: base::[ 670.362 6.70362 0.2694
2: NA::[.data.table 667.350 6.67350 0.2682
3: .GlobalEnv::foo 335.784 3.35784 0.1349
4: base::[[ 163.044 1.63044 0.0655
5: base::[[.data.frame 133.790 1.33790 0.0537
6: base::%in% 120.512 1.20512 0.0484
7: base::sys.call 86.846 0.86846 0.0348
8: NA::replace_dot_alias 27.824 0.27824 0.0112
9: base::which 23.536 0.23536 0.0095
10: base::sapply 22.080 0.22080 0.0089
pkg total_time avg_time avg_pct
1: base 1397.770 13.97770 0.7938
2: .GlobalEnv 335.784 3.35784 0.1908
3: data.table 27.262 0.27262 0.0155
Any reason use base::split this way?
data.table has a split function, so your split_df can be defined as:
split_df <- function(d, var) {
split(d, by = var)
}
It seems speed up the parallel approach a little bit.
Mostly that I was unaware of the split method in data.table, which is because it isn't exported. That said, I'm not seeing the speed improvements you mention:
library(data.table)
library(microbenchmark)
data(iris)
setDT(iris)
microbenchmark(a= base::split(iris, get("Species", as.environment(iris))),
b= data.table:::split.data.table(iris, by= "Species"))
Unit: microseconds
expr min lq mean median uq max neval cld
a 727.097 777.306 926.0856 830.4085 1000.298 4626.810 100 a
b 3410.960 3618.120 4285.8502 3879.8845 4785.291 9148.402 100 b
> sessionInfo()
R version 3.5.0 (2018-04-23)
Platform: x86_64-apple-darwin15.6.0 (64-bit)
Running under: macOS High Sierra 10.13.4
This realllly surprise me! OK. Then I think maybe the iris data is too small. So I tried the dt from your original post.
microbenchmark(a= base::split(dt, get("grps", as.environment(dt))),
b= data.table:::split.data.table(dt, by= "grps"),
times= 10L)
Unit: milliseconds
expr min lq mean median uq max neval
a 2508.4950 2586.1460 2639.1534 2595.5424 2689.4437 2804.7388 10
b 463.7465 495.1194 614.1077 622.3955 716.6977 775.2486 10
Thanks for the catch! I have this helper function in a few personal projects / packages. Good to know there's a quick and easy speed gain to be captured.
Not sure whether this change will have meaningful impact on the result of the original post. I found this page from https://stackoverflow.com/questions/14759905/data-table-and-parallel-computing
Glad to see that folk are catching benchmarks run on too-small data. Yes, the data size must be large. Unless you really do desire to measure low latency on small data, say for a real-time application.
I agree parallelism is useful but not at R level in the way you've demonstrated. split for example is a copy of all the data, which not only wastes time but wastes RAM too. There are other ways to share memory directly between multiple R sessions. However, so far, we've been parallelizing data.table internally using OpenMP at C level. You've seen fwrite, and now fread speed, right? Both now multi-threaded internally. You could benchmark their performance vs number of threads. Mark Klik did that here : http://www.fstpackage.org/. Maybe we'll get to grouping one day.
Since we are parallelizing data.table internally, can this issue be closed now? I just saw the original issue was June 2017 and the recent comments are picking up on this old issue.
Currently split method for data.table is just a wrapper over, AFAIR, other exported data.table R functions, so there is no magic or proper (internal) optimization done.
Also operation you are benchmarking is best to be simplified to atomic process, then it makes more sense to compare. Better to compare few atomic processes than one bigger workflow, I understand you might be interested about workflow speedup but it is difficult to draw conclusions from such combined processes. I filled new issue for parallel aggregations, not related to parallel package, #2919.
@jangorecki The data.table document says "Split method for data.table. Faster and more flexible. Be aware that processing list of data.tables will be generally much slower than manipulation in single data.table by group using by argument, read more on data.table."
See: https://jangorecki.gitlab.io/data.table/library/data.table/html/split.html
Also can be found in: https://cran.r-project.org/web/packages/data.table/data.table.pdf
@jflycn yes, I am aware of it, but the code of function does not use any C, and AFAIR not even non-exported data.table functions. Using verbose=TRUE you can see the regular data.table query that was built to perform split.
Also now, links to doc works on Rdatatable namespace: https://rdatatable.gitlab.io/data.table/library/data.table/html/split.html
and those will be up-to-date. To my namespace I am not pushing master branch very often.
@jangorecki do you know any way to stop google sending people to your documentation links? i've seen them come up a few times from searches myself
Most helpful comment
This realllly surprise me! OK. Then I think maybe the iris data is too small. So I tried the dt from your original post.