Data.table: parallelise CsubsetDT

Created on 19 Apr 2016  路  14Comments  路  Source: Rdatatable/data.table

  • [x] parallelise CsubsetDT
  • [x] resolve _thread-safe_ issues (on SET_STRING_ELT / SET_VECTOR_ELT)
  • [x] add provision for nthreads
  • [x] cleaner implementation (to maintain easily)
internals openmp

Most helpful comment

Most users of data.table will just want to use it, liking that they don't need to write any explicit parallelism and it'll just use their cores for them. They won't need to need to learn or remember to use options or call functions before they benefit from parallelism. What should these users set the number of threads to anyway: 4 or 8 on a 4 core box? It raises more questions for them. Better let OpenMP do its thing and let the OS handle scheduling.

@HenrikBengtsson For advanced users who already use explicit parallelism they need a way to limit to one cpu: yes absolutely. But I don't see why the default should be 1.

All 14 comments

A 4.5GB benchmark comparison. Including edits from Matt.

data

require(data.table)
set.seed(1L)
cols = 4L
rows = 2e8L
x = list(a = sample(letters, rows, TRUE), 
         b = sample(1e5, rows, TRUE),
         c = round(runif(rows),6),
         d = sample(c(TRUE,FALSE), rows, TRUE))
setDT(x)
gc()
print(object.size(x), units="GB")   # 4.5 GB   i.e. small
# generate half of nrow(x) indices
res = rows/2
ix = sample(rows, res, FALSE)
# randomly replace 1000 indices with 0, so that they won't be in the result
bla = sample(length(ix), 1000L)
ix[bla] = 0L
# result should be res-1000 rows

v1.9.6 single threaded

# this is the code that's run when one does x[ix] or x[ix, <cols>].
system.time(ans <- .Call("CsubsetDT", x, ix, 1:cols))
#   user  system elapsed
#  5.828   0.128   5.956
#   user  system elapsed   # Previous result from Arun likely included swapping
# 18.990   0.957  20.007   # Neither of us could reproduce this 20s
nrow(ans) == res-1000L     # [1] TRUE
write.table(ans, "~/tmp1.tsv", row.names=FALSE, quote=FALSE, sep="\t")
#   user  system elapsed 
# 201.156   2.136 203.490

v1.9.7 within-column parallel subsetting ( 8th, 4cpu)

# fresh R session
system.time(ans <- .Call("CsubsetDT", x, ix, 1:cols))
#   user  system elapsed 
# 11.960   1.092   3.970    # 4s here vs 6s above 
nrow(ans) == res-1000L   # [1] TRUE
system.time(fwrite(ans, "~/tmp2.tsv", quote=FALSE, sep="\t"))
#   user  system elapsed                                                                                                              
# 22.112   2.348   4.930

double check the results

system("diff ~/tmp1.tsv ~/tmp2.tsv")  # two 2.1GB files identical

v1.9.7 across-column parallel subsetting ( 8th, 4cpu)

Reimplemented by Matt to solve #1883 hopefully

# fresh R session
system.time(ans <- .Call("CsubsetDT", x, ix, 1:cols))
   user  system elapsed 
  9.232   0.216   3.167
nrow(ans) == res-1000L   # [1] TRUE
fwrite(ans, "~/tmp2.tsv", quote=FALSE, sep="\t")
system("diff ~/tmp1.tsv ~/tmp2.tsv")   # same
dim(ans)
[1] 99999000        4


# without the ix[bla] = 0L is now faster for no 0 no NA case 
res = rows/2
ix = sample(rows, res, FALSE)
system.time(ans <- .Call("CsubsetDT", x, ix, 1:cols))
   user  system elapsed 
  8.288   0.180   2.896
dim(ans)
[1] 100000000         4

@arunsrinivasan This is absolutely great! However, maybe we also need an option to force the use of only one core (or better a specific number of cores). This could be helpful when users have existing code using i.e. doSNOW or doParallel and a parallelised data.table operation would 'fight' with them for the same processing resources.

@ChristK yes, that'll be done eventually (before release). Right now, I'm looking at issues related to _thread-safety_.. will come back to this point. Thanks for pointing out.

Just happens to "drive by": Sorry if I'm missing something already discussed, but don't you think the default should be to use a single thread/core, unless user explicitly specified otherwise? Maybe you could set the default to getOption(" mc.cores", 1L).

I'm worried about cases where user, but also package developer, is not aware that your package is being used. If such a person in turn runs things in parallel and your package by default user more than one thread, then it can quickly blow up. Then, imagine another layer of parallelism, and so on.

I see what you mean. Will note it down for internal discussion. Thanks @HenrikBengtsson.

Good to hear. Here's a comment by Roger Koenker where implicit and explicit parallelism competing to use the cores caused a major slow down: https://stat.ethz.ch/pipermail/r-sig-hpc/2014-August/001914.html.

Definitely useful. Thanks again.

The current parts that are OpenMP-ised are subset of a data.table and reordering a data.table by reference. These operations wouldn't be coupled usually with other embarrassingly parallel problems normally, I think . However I'm leaning more towards setting the default to 1 (at least for now) as it'd have no consequences for current users using data.table in parallel (explicitly) already.

Most users of data.table will just want to use it, liking that they don't need to write any explicit parallelism and it'll just use their cores for them. They won't need to need to learn or remember to use options or call functions before they benefit from parallelism. What should these users set the number of threads to anyway: 4 or 8 on a 4 core box? It raises more questions for them. Better let OpenMP do its thing and let the OS handle scheduling.

@HenrikBengtsson For advanced users who already use explicit parallelism they need a way to limit to one cpu: yes absolutely. But I don't see why the default should be 1.

Agree; what the default should/could be is not obvious. The R world as we know it is currently mostly a single threaded. This is clearly changing and more and more packages will start supporting multi-process/core/thread processing. Scientific software like R is multi-purpose and differs from other type of "regular" software that may multi-process processing by default. For the latter it's easier to control the parallelism, whereas in R it will be a mix of packages that are impossible to predict.

My interest in this is mostly through my work on the future package, where I most recently implemented support for nested futures (=nested parallelism), which user can control, e.g. plan(list(batchjobs, multicore)) or plan(list(multicore, multicore)). I think we'll see more needs for being able to control nestedness and the amount of parallelism and I believe we all would gain if there could be some core R options/functionality for controlling this in a unified fashion. Maybe R could make threads/cores is a limited resource that packages requests and releases helping R not to overload the machine. On the other hand, that might end up reinventing the OS kernels. I really don't know and my OS kernel knowledge is rather old school.

Having said all that, when I suggested getOption(" mc.cores", 1L), my point was less on 1L being the default and more on leveraging the already existing mc.cores option (of the parallel package). Except from option Ncpus, which is used by install.packages(), mc.cores is the only option I'm aware of in R that the user can use to control amount of parallelism. You could also default to getOption(" mc.cores", detectCores()) to maximum CPU utilization. ... or possibly getOption("mc.cores", detectCores()-1L) due to the definition of mc.cores, cf. https://github.com/HenrikBengtsson/Wishlist-for-R/issues/7.

BTW and slightly related to this is the CRAN Repository Policy requiring "Checking the package [...] If running a package uses multiple threads/cores it must never use more than two simultaneously". In other words, you do need an option to control this also because of this.

Great work!

Great discussion in this issue! Another small question is when I use OpenMP on small data, the overhead of multi-threading can make it slower than not using it (if I remember correctly). Maybe parallelism should only be enabled for large enough data?

@renkun-ken with Matt's commit, rows <= 1000 won't be parallelised. This should be updated based on more experiments/tests in the future.

@mattdowle I've now simplified the logic (we discussed sometime around useR'16 that it might be harder to maintain). Logic is heavily commented for now.

There's acceptable speedups even on smaller data:

set.seed(1L)
require(data.table)
dt = setDT(lapply(1:10, function(x) sample(5e6)))
print(object.size(dt), units="Mb") # 190.7 Mb

setDTthreads(0L) # uses all threads
system.time(ans1 <- .Call("CsubsetDT", dt, 1:3e6, 1:10))
#    user  system elapsed 
#   0.149   0.071   0.064 
setDTthreads(1L)
system.time(ans2 <- .Call("CsubsetDT", dt, 1:3e6, 1:10))
#    user  system elapsed 
#   0.092   0.027   0.120 
identical(ans1, ans2) # [1] TRUE
setDTthreads(0L) # set it back to use all threads

edit by jangorecki: when calling not exported CsubsetDT function user needs to be aware of #1762.

People Like me who do not understand programming COME to R. Please Don't implement single core as a default settings. Let it be at it's fastest possible speed without giving any further arguements... Please

Was this page helpful?
0 / 5 - 0 ratings