Data.table: Performance drop with 1.12.0 (Selection + assignment)

Created on 12 Feb 2019  ยท  16Comments  ยท  Source: Rdatatable/data.table

I'm not sure if it's the same issue as #3330 because I haven't set the DTthreads parameters to 1.
But I could imagine that the default value is 1.
This list of operations take less than 1 second to be executed with data.table v1.11.8.
With data.table v1.12.0 it's take more than 4 seconds !

>   myDataTable[colJ != "production", colK:=na.omit(colK), by=.(colA)]
>   myDataTable[colJ != "production", colH:=na.omit(colH), by=.(colA)]
>   myDataTable[colJ != "production", colL:=na.omit(colL), by=.(colA)]
>   myDataTable[colJ == "code",colM:=colE, by=.(colA)]
>   myDataTable[,colM:=na.omit(colM), by=.(colA)]
>   myDataTable[is.na(colM), colM:=""]
>   myDataTable[colJ == "test",colN:=colE, by=.(colA)]
>   myDataTable[,colN:=na.omit(colN), by=.(colA)]

And there is only 4 rows in my data table :

print(myDataTable)

     colA colB colC colD colE colF colG colH colI colJ colK colL colM colN
1    text text text text text text text text text text text text text text
2    text text text text text text text text text text text text text text
3    text text text text text text text text text text text text text text
4    text text text text text text text text text text text text text text

(You can replace "text" by whatever)

We see this problem only in our production environment and not in other environment. This production server is really busy so it can explain why it's so long. But the test with v1.11.8 was done in the same environment without performance issue (We done this test many times with both version).

We run R in a Docker container with the system Ubuntu 16.04.

> sessionInfo()
R version 3.4.4 (2018-03-15)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 16.04.5 LTS

Matrix products: default
BLAS: /usr/lib/libblas/libblas.so.3.6.0
LAPACK: /usr/lib/lapack/liblapack.so.3.6.0

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C
 [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8
 [7] LC_PAPER=en_US.UTF-8       LC_NAME=C
 [9] LC_ADDRESS=C               LC_TELEPHONE=C
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base

loaded via a namespace (and not attached):
[1] compiler_3.4.4

I hope this will be fixed in a future data.table release because we will be blocked in v1.11.8 until this issue is not resolved.
Thank you for your support.

openmp

Most helpful comment

Also facing the issue, using:

  • 224 cores / 448 threads machine (8x Xeon Platinum 8180) with NUMA enabled, slowdown is massive
  • 36 cores / 72 threads (2x Xeon Gold 6154) with NUMA disabled, 6x slowdown
  • 18 cores / 36 threads (i9-9980XE), 4x slowdown

Here is more details using @akersting reproducible example. The issue is linked to the presence (or non-presence) of uniqueN with high cardinality group by. min and max are negligible.

Does uniqueN uses extra parallelism or causes any side effect? It starts from the following line:

https://github.com/Rdatatable/data.table/blob/ca43a475fdcb4abf40e5915c5611270e89898160/R/duplicated.R#L147

With Intel VTune, time "seems" mostly spent creating threads (might not be true, I don't have debugging symbols):

image

As I don't have the debugging symbols for OpenMP, I cannot find out exactly where it happens:

image

Some examples:

library(data.table)

N_X <- 1e6
n_day <- 60
n_clientid <- 1e5
n_Platform <- 7
X <- data.table(day = sample(1:n_day, N_X, TRUE),
                clientid = as.character(sample(1:n_clientid, N_X, TRUE)),
                Platform = as.character(sample(1:n_Platform, N_X, TRUE)))

n_cores <- parallel::detectCores()

# Reported slowdown: uniqueN
results <- list()
for (i in seq_len(n_cores)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  setDTthreads(i)
  results[[i]] <- system.time(X[, .(x = uniqueN(day) - 1L,
                                    first_active_day = min(day),
                                    last_active_day = max(day)),
                                by = .(Platform, clientid)])
  cat("[user=", sprintf("%06.03f", results[[i]][1]),
      ", system=", sprintf("%06.03f", results[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results[[i]][3]), "]\n", sep = "")
}

# No slowdown: length(unique()) workaround
results2 <- list()
for (i in seq_len(n_cores)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  setDTthreads(i)
  results2[[i]] <- system.time(X[, .(x = length(unique(day)) - 1L,
                                     first_active_day = min(day),
                                    last_active_day = max(day)),
                                by = .(Platform, clientid)])
  cat("[user=", sprintf("%06.03f", results2[[i]][1]),
      ", system=", sprintf("%06.03f", results2[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results2[[i]][3]), "]\n", sep = "")
}

# Slowdown: uniqueN only
results3 <- list()
for (i in seq_len(n_cores)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  setDTthreads(i)
  results3[[i]] <- system.time(X[, .(x = uniqueN(day) - 1L),
                                 by = .(Platform, clientid)])
  cat("[user=", sprintf("%06.03f", results3[[i]][1]),
      ", system=", sprintf("%06.03f", results3[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results3[[i]][3]), "]\n", sep = "")
}

# No slowdown: no uniqueN
results4 <- list()
for (i in seq_len(n_cores)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  setDTthreads(i)
  results4[[i]] <- system.time(X[, .(first_active_day = min(day),
                                    last_active_day = max(day)),
                                by = .(Platform, clientid)])
  cat("[user=", sprintf("%06.03f", results4[[i]][1]),
      ", system=", sprintf("%06.03f", results4[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results4[[i]][3]), "]\n", sep = "")
}

# No slowdown: uniqueN, low cardinality
results5 <- list()
for (i in seq_len(n_cores)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  setDTthreads(i)
  results5[[i]] <- system.time(X[, .(x = uniqueN(day) - 1L),
                                 by = .(Platform)])
  cat("[user=", sprintf("%06.03f", results5[[i]][1]),
      ", system=", sprintf("%06.03f", results5[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results5[[i]][3]), "]\n", sep = "")
}

# Slowdown: uniqueN, high cardinality
results6 <- list()
for (i in seq_len(n_cores)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  setDTthreads(i)
  results6[[i]] <- system.time(X[, .(x = uniqueN(day) - 1L),
                                 by = .(clientid)])
  cat("[user=", sprintf("%06.03f", results6[[i]][1]),
      ", system=", sprintf("%06.03f", results6[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results6[[i]][3]), "]\n", sep = "")
}

Debugger output for branching:

> debugonce(uniqueN)
> X[, .(x = uniqueN(day),
+       first_active_day = min(day),
+       last_active_day = max(day))]
debugging in: uniqueN(day)
debug: {
    if (missing(by) && is.data.table(x) && isTRUE(getOption("datatable.old.unique.by.key"))) {
        by = key(x)
        warning(warning_oldUniqueByKey)
    }
    if (is.null(x)) 
        return(0L)
    if (!is.atomic(x) && !is.data.frame(x)) 
        stop("x must be an atomic vector or data.frames/data.tables")
    if (is.atomic(x)) {
        if (is.logical(x)) 
            return(.Call(CuniqueNlogical, x, na.rm = na.rm))
        x = as_list(x)
    }
    if (is.null(by)) 
        by = seq_along(x)
    o = forderv(x, by = by, retGrp = TRUE, na.last = if (!na.rm) 
        FALSE
    else NA)
    starts = attr(o, "starts")
    if (!na.rm) {
        length(starts)
    }
    else {
        sum((if (length(o)) o[starts] else starts) != 0L)
    }
}
Browse[2]> n
debug: if (missing(by) && is.data.table(x) && isTRUE(getOption("datatable.old.unique.by.key"))) {
    by = key(x)
    warning(warning_oldUniqueByKey)
}
Browse[2]> n
debug: if (is.null(x)) return(0L)
Browse[2]> n
debug: if (!is.atomic(x) && !is.data.frame(x)) stop("x must be an atomic vector or data.frames/data.tables")
Browse[2]> n
debug: if (is.atomic(x)) {
    if (is.logical(x)) 
        return(.Call(CuniqueNlogical, x, na.rm = na.rm))
    x = as_list(x)
}
Browse[2]> n
debug: if (is.logical(x)) return(.Call(CuniqueNlogical, x, na.rm = na.rm))
Browse[2]> n
debug: x = as_list(x)
Browse[2]> as_list
function (x) 
{
    lx = vector("list", 1L)
    .Call(Csetlistelt, lx, 1L, x)
    lx
}
<bytecode: 0x55555bc452a8>
<environment: namespace:data.table>
Browse[2]> n
debug: if (is.null(by)) by = seq_along(x)
Browse[2]> n
debug: seq_along(x)
Browse[2]> n
debug: o = forderv(x, by = by, retGrp = TRUE, na.last = if (!na.rm) FALSE else NA)
Browse[2]> n
debug: [1] FALSE
Browse[2]> n
debug: starts = attr(o, "starts")
Browse[2]> n
debug: if (!na.rm) {
    length(starts)
} else {
    sum((if (length(o)) o[starts] else starts) != 0L)
}
Browse[2]> n
debug: length(starts)
Browse[2]> n
exiting from: uniqueN(day)

Test function for Intel VTune:

library(data.table)

N_X <- 1e6
n_day <- 60
n_clientid <- 1e5
n_Platform <- 7
X <- data.table(day = sample(1:n_day, N_X, TRUE),
                clientid = as.character(sample(1:n_clientid, N_X, TRUE)),
                Platform = as.character(sample(1:n_Platform, N_X, TRUE)))

setDTthreads(parallel::detectCores())

results <- list()
for (i in seq_len(5)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  results[[i]] <- system.time(X[, .(x = uniqueN(day) - 1L,
                                    first_active_day = min(day),
                                    last_active_day = max(day)),
                                by = .(clientid)])
  cat("[user=", sprintf("%06.03f", results[[i]][1]),
      ", system=", sprintf("%06.03f", results[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results[[i]][3]), "]\n", sep = "")
}

All 16 comments

Thanks for reporting.
Be aware that by default data.table will use all available cores, thus parallel processing overhead might be more visible on small sizes data sets.
I turned your example into reproducible code

library(data.table)
fread("id colA colB colC colD colE colF colG colH colI colJ colK colL colM colN
1 text text text text text text text text text text text text text text
2 text text text text text text text text text text text text text text
3 text text text text text text text text text text text text text text
4 text text text text text text text text text text text text text text", sep=" ") -> myDataTable
quote({
  myDataTable[colJ != "production", colK:=na.omit(colK), by=.(colA)]
  myDataTable[colJ != "production", colH:=na.omit(colH), by=.(colA)]
  myDataTable[colJ != "production", colL:=na.omit(colL), by=.(colA)]
  myDataTable[colJ == "code",colM:=colE, by=.(colA)]
  myDataTable[,colM:=na.omit(colM), by=.(colA)]
  myDataTable[is.na(colM), colM:=""]
  myDataTable[colJ == "test",colN:=colE, by=.(colA)]
  myDataTable[,colN:=na.omit(colN), by=.(colA)]
}) -> q
setNames(as.list(q[-1L]), paste0("q", seq.int(length(q)-1L))) -> l
packageVersion("data.table")
rbindlist(lapply(l, function(expr) as.list(system.time(eval(expr)))), idcol="q")

and the timings I am getting are as follows
latest master

        q user.self sys.self elapsed user.child sys.child
   <char>     <num>    <num>   <num>      <num>     <num>
1:     q1     0.001        0   0.001          0         0
2:     q2     0.001        0   0.001          0         0
3:     q3     0.001        0   0.001          0         0
4:     q4     0.002        0   0.002          0         0
5:     q5     0.000        0   0.001          0         0
6:     q6     0.000        0   0.000          0         0
7:     q7     0.001        0   0.002          0         0
8:     q8     0.001        0   0.000          0         0

1.11.8

        q user.self sys.self elapsed user.child sys.child
   <char>     <num>    <num>   <num>      <num>     <num>
1:     q1     0.008        0   0.007          0         0
2:     q2     0.001        0   0.002          0         0
3:     q3     0.000        0   0.001          0         0
4:     q4     0.004        0   0.003          0         0
5:     q5     0.001        0   0.000          0         0
6:     q6     0.001        0   0.000          0         0
7:     q7     0.002        0   0.001          0         0
8:     q8     0.001        0   0.000          0         0

Timings are so tiny that it is hard to draw any conclusions, but at the first glance latest master appears to be faster. Could you please run above code on your 1.11.8 and 1.12.0/master and provide results? Also provide output of getDTthreads(verbose=TRUE).

Thank you for raising this issue. It identified a massive performance drop in a package when being tested on CircleCI. The automatic setting of threads resulted in a tenfold speed decrease.

Perhaps multithreading should be disabled either by default or in the presence of an environment variable. The problem with setDTthreads, as I see it, is that it may cause performance problems in packages that merely import data.table.

We need good reproducible examples which highlights the speed decrease, so we have something to debug on. There are already some, but none of them as much as x10. As of now default is using all cores. We could try to run revdeps using all threads vs OMP_NUM_THREADS=1 and compare timings.

Here you go: https://circleci.com/gh/HughParsonage/grattan/tree/population-forecast

Turned out to be a 300x increase in runtime(!!):

With multithreading:

image

and without:

image

Looks like what I observed in my systems (#3298).

@HughParsonage, would you mind to take a look at how setDTthreads(2L) performs? What about using the number of physical cores?

Using setDTthreads(2L) appears to give identical timings to setDTthreads(1L) (i.e. fast)

https://circleci.com/gh/HughParsonage/grattan/259#tests/containers/3

@HughParsonage are you able to run test script from console? something like Rscript tests/testthat/script.R

I am getting some not well described error

devtools::test(filter="pension")
Loading grattan
Error: Command failed (1)

I think it is because of lack of other dependencies of grattan.

Can you run devtools::test(filter = "age.pension.age")? That should have minimal dependencies (but does depend on data.table).

@HughParsonage

> library(data.table); setDTthreads(1L); devtools::test(filter="age.pension")
data.table 1.12.1 IN DEVELOPMENT built 2019-02-12 16:09:43 UTC; jan  Latest news: r-datatable.com
Loading grattan
Loading required package: testthat
Testing grattan
โœ” | OK F W S | Context
โœ” | 11       | Age pension age [0.2 s]
โœ” | 27       | Age pension [0.6 s]

โ•โ• Results โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
Duration: 0.9 s

OK:       38
Failed:   0
Warnings: 0
Skipped:  0
> library(data.table); setDTthreads(8L); devtools::test(filter="age.pension")
Loading grattan
Testing grattan
โœ” | OK F W S | Context
โœ” | 11       | Age pension age
โœ” | 27       | Age pension [0.5 s]

โ•โ• Results โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
Duration: 0.6 s

OK:       38
Failed:   0
Warnings: 0
Skipped:  0

I set 8 threads in second try, but it actually uses 4.

CPU(s):              4
On-line CPU(s) list: 0-3
Thread(s) per core:  2
Core(s) per socket:  2

have you tried reversing the order of your commands? so test first many threads, and then single thread.

Thank you for this quick answer.
Yes I think this is link with multi-threading default value, because this server is really busy.
This server also have many logical CPU and @renkun-ken mention the relation between this and the lack of performance.

I have run your test @jangorecki

> packageVersion("data.table")
[1] โ€˜1.11.8โ€™
> rbindlist(lapply(l, function(expr) as.list(system.time(eval(expr)))), idcol="q")
    q user.self sys.self elapsed user.child sys.child
1: q1     0.008        0   0.008          0         0
2: q2     0.000        0   0.001          0         0
3: q3     0.004        0   0.001          0         0
4: q4     0.004        0   0.005          0         0
5: q5     0.000        0   0.001          0         0
6: q6     0.000        0   0.000          0         0
7: q7     0.004        0   0.002          0         0
8: q8     0.000        0   0.000          0         0
> packageVersion("data.table")
[1] โ€˜1.12.0โ€™
> rbindlist(lapply(l, function(expr) as.list(system.time(eval(expr)))), idcol="q")
    q user.self sys.self elapsed user.child sys.child
1: q1     1.228    0.008   0.305          0         0
2: q2     1.112    0.004   0.325          0         0
3: q3     0.816    0.020   0.195          0         0
4: q4     1.420    0.016   0.282          0         0
5: q5     0.796    0.008   0.164          0         0
6: q6     0.000    0.000   0.001          0         0
7: q7     0.984    0.012   0.222          0         0
8: q8     0.696    0.012   0.164          0         0

Number of core/threads:

> print(RhpcBLASctl::get_num_cores())
[1] 24
> print(RhpcBLASctl::get_num_procs())
[1] 48

@JeremyBesson thanks, could you try setDTthreads(24) to confirm that limiting to number of physical cores will help?
Ideally check when server is not busy. Dealing with resource sharing between R and other processes is not something that we can handle automatically.

Sorry jangorecki I'm not sure if I've answered your question, but reversing the commands did not seem to make any difference.

@HughParsonage OK, problem seems to be environment specific and manifests when different services are running as well as R. Could you also please check if limiting to number of physical cores (not just 2, unless you test on 2 core machine) will resolve speed issue? If it will then probably the safest option will be to change the default, as suggested by @renkun-ken

With setDTthreads(24)

> packageVersion("data.table")
[1] โ€˜1.12.0โ€™
> rbindlist(lapply(l, function(expr) as.list(system.time(eval(expr)))), idcol="q")
    q user.self sys.self elapsed user.child sys.child
1: q1     0.260        0   0.054          0         0
2: q2     0.000        0   0.001          0         0
3: q3     0.000        0   0.001          0         0
4: q4     0.076        0   0.006          0         0
5: q5     0.000        0   0.001          0         0
6: q6     0.000        0   0.001          0         0
7: q7     0.448        0   0.031          0         0
8: q8     0.212        0   0.012          0         0
>
> packageVersion("data.table")
[1] โ€˜1.11.8โ€™
> rbindlist(lapply(l, function(expr) as.list(system.time(eval(expr)))), idcol="q")
    q user.self sys.self elapsed user.child sys.child
1: q1     0.008        0   0.008          0         0
2: q2     0.000        0   0.001          0         0
3: q3     0.000        0   0.001          0         0
4: q4     0.004        0   0.005          0         0
5: q5     0.000        0   0.000          0         0
6: q6     0.004        0   0.000          0         0
7: q7     0.000        0   0.002          0         0
8: q8     0.000        0   0.001          0         0

With setDTthreads(1)

> packageVersion("data.table")
[1] โ€˜1.12.0โ€™
> rbindlist(lapply(l, function(expr) as.list(system.time(eval(expr)))), idcol="q")
    q user.self sys.self elapsed user.child sys.child
1: q1     0.008        0   0.008          0         0
2: q2     0.000        0   0.001          0         0
3: q3     0.000        0   0.001          0         0
4: q4     0.004        0   0.005          0         0
5: q5     0.000        0   0.001          0         0
6: q6     0.004        0   0.000          0         0
7: q7     0.000        0   0.001          0         0
8: q8     0.000        0   0.001          0         0
>
> packageVersion("data.table")
[1] โ€˜1.11.8โ€™
> rbindlist(lapply(l, function(expr) as.list(system.time(eval(expr)))), idcol="q")
    q user.self sys.self elapsed user.child sys.child
1: q1     0.004    0.004   0.009          0         0
2: q2     0.000    0.000   0.001          0         0
3: q3     0.004    0.000   0.001          0         0
4: q4     0.004    0.000   0.004          0         0
5: q5     0.000    0.000   0.000          0         0
6: q6     0.000    0.000   0.000          0         0
7: q7     0.000    0.000   0.001          0         0
8: q8     0.004    0.000   0.001          0         0
>

Here is another example for this issue: On an otherwise idle AWS r4.16xlarge ec2 instance (64 vCPU) I get:

N_X <- 1e6
n_day <- 60
n_clientid <- 1e5
n_Platform <- 7
X <- data.table(
  day = sample(1:n_day, N_X, TRUE),
  clientid = as.character(sample(1:n_clientid, N_X, TRUE)),
  Platform = as.character(sample(1:n_Platform, N_X, TRUE))
)

setDTthreads()
getDTthreads(verbose = TRUE)
#omp_get_max_threads() = 64
#omp_get_thread_limit() = 2147483647
#DTthreads = 0
#RestoreAfterFork = true
#[1] 64
system.time(
  X[, .(x = uniqueN(day) - 1L,
        first_active_day = min(day),
        last_active_day = max(day)),
    by = .(Platform, clientid)]
)
#    user   system  elapsed 
#3958.998   10.023   64.454 

setDTthreads(1)
system.time(
  X[, .(x = uniqueN(day) - 1L,
        first_active_day = min(day),
        last_active_day = max(day)),
    by = .(Platform, clientid)]
)
#   user  system elapsed 
#  7.903   0.277   8.184 

setDTthreads(32)
system.time(
  X[, .(x = uniqueN(day) - 1L,
        first_active_day = min(day),
        last_active_day = max(day)),
    by = .(Platform, clientid)]
)
#   user  system elapsed 
#611.456   1.192  19.571

For the real data where X is about 1e8 rows the difference in run-time is even larger. Overall, one larger part of code which (with data.table version < 1.12.0) took about 30 minutes now takes about 10 hours.

Also facing the issue, using:

  • 224 cores / 448 threads machine (8x Xeon Platinum 8180) with NUMA enabled, slowdown is massive
  • 36 cores / 72 threads (2x Xeon Gold 6154) with NUMA disabled, 6x slowdown
  • 18 cores / 36 threads (i9-9980XE), 4x slowdown

Here is more details using @akersting reproducible example. The issue is linked to the presence (or non-presence) of uniqueN with high cardinality group by. min and max are negligible.

Does uniqueN uses extra parallelism or causes any side effect? It starts from the following line:

https://github.com/Rdatatable/data.table/blob/ca43a475fdcb4abf40e5915c5611270e89898160/R/duplicated.R#L147

With Intel VTune, time "seems" mostly spent creating threads (might not be true, I don't have debugging symbols):

image

As I don't have the debugging symbols for OpenMP, I cannot find out exactly where it happens:

image

Some examples:

library(data.table)

N_X <- 1e6
n_day <- 60
n_clientid <- 1e5
n_Platform <- 7
X <- data.table(day = sample(1:n_day, N_X, TRUE),
                clientid = as.character(sample(1:n_clientid, N_X, TRUE)),
                Platform = as.character(sample(1:n_Platform, N_X, TRUE)))

n_cores <- parallel::detectCores()

# Reported slowdown: uniqueN
results <- list()
for (i in seq_len(n_cores)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  setDTthreads(i)
  results[[i]] <- system.time(X[, .(x = uniqueN(day) - 1L,
                                    first_active_day = min(day),
                                    last_active_day = max(day)),
                                by = .(Platform, clientid)])
  cat("[user=", sprintf("%06.03f", results[[i]][1]),
      ", system=", sprintf("%06.03f", results[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results[[i]][3]), "]\n", sep = "")
}

# No slowdown: length(unique()) workaround
results2 <- list()
for (i in seq_len(n_cores)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  setDTthreads(i)
  results2[[i]] <- system.time(X[, .(x = length(unique(day)) - 1L,
                                     first_active_day = min(day),
                                    last_active_day = max(day)),
                                by = .(Platform, clientid)])
  cat("[user=", sprintf("%06.03f", results2[[i]][1]),
      ", system=", sprintf("%06.03f", results2[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results2[[i]][3]), "]\n", sep = "")
}

# Slowdown: uniqueN only
results3 <- list()
for (i in seq_len(n_cores)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  setDTthreads(i)
  results3[[i]] <- system.time(X[, .(x = uniqueN(day) - 1L),
                                 by = .(Platform, clientid)])
  cat("[user=", sprintf("%06.03f", results3[[i]][1]),
      ", system=", sprintf("%06.03f", results3[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results3[[i]][3]), "]\n", sep = "")
}

# No slowdown: no uniqueN
results4 <- list()
for (i in seq_len(n_cores)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  setDTthreads(i)
  results4[[i]] <- system.time(X[, .(first_active_day = min(day),
                                    last_active_day = max(day)),
                                by = .(Platform, clientid)])
  cat("[user=", sprintf("%06.03f", results4[[i]][1]),
      ", system=", sprintf("%06.03f", results4[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results4[[i]][3]), "]\n", sep = "")
}

# No slowdown: uniqueN, low cardinality
results5 <- list()
for (i in seq_len(n_cores)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  setDTthreads(i)
  results5[[i]] <- system.time(X[, .(x = uniqueN(day) - 1L),
                                 by = .(Platform)])
  cat("[user=", sprintf("%06.03f", results5[[i]][1]),
      ", system=", sprintf("%06.03f", results5[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results5[[i]][3]), "]\n", sep = "")
}

# Slowdown: uniqueN, high cardinality
results6 <- list()
for (i in seq_len(n_cores)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  setDTthreads(i)
  results6[[i]] <- system.time(X[, .(x = uniqueN(day) - 1L),
                                 by = .(clientid)])
  cat("[user=", sprintf("%06.03f", results6[[i]][1]),
      ", system=", sprintf("%06.03f", results6[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results6[[i]][3]), "]\n", sep = "")
}

Debugger output for branching:

> debugonce(uniqueN)
> X[, .(x = uniqueN(day),
+       first_active_day = min(day),
+       last_active_day = max(day))]
debugging in: uniqueN(day)
debug: {
    if (missing(by) && is.data.table(x) && isTRUE(getOption("datatable.old.unique.by.key"))) {
        by = key(x)
        warning(warning_oldUniqueByKey)
    }
    if (is.null(x)) 
        return(0L)
    if (!is.atomic(x) && !is.data.frame(x)) 
        stop("x must be an atomic vector or data.frames/data.tables")
    if (is.atomic(x)) {
        if (is.logical(x)) 
            return(.Call(CuniqueNlogical, x, na.rm = na.rm))
        x = as_list(x)
    }
    if (is.null(by)) 
        by = seq_along(x)
    o = forderv(x, by = by, retGrp = TRUE, na.last = if (!na.rm) 
        FALSE
    else NA)
    starts = attr(o, "starts")
    if (!na.rm) {
        length(starts)
    }
    else {
        sum((if (length(o)) o[starts] else starts) != 0L)
    }
}
Browse[2]> n
debug: if (missing(by) && is.data.table(x) && isTRUE(getOption("datatable.old.unique.by.key"))) {
    by = key(x)
    warning(warning_oldUniqueByKey)
}
Browse[2]> n
debug: if (is.null(x)) return(0L)
Browse[2]> n
debug: if (!is.atomic(x) && !is.data.frame(x)) stop("x must be an atomic vector or data.frames/data.tables")
Browse[2]> n
debug: if (is.atomic(x)) {
    if (is.logical(x)) 
        return(.Call(CuniqueNlogical, x, na.rm = na.rm))
    x = as_list(x)
}
Browse[2]> n
debug: if (is.logical(x)) return(.Call(CuniqueNlogical, x, na.rm = na.rm))
Browse[2]> n
debug: x = as_list(x)
Browse[2]> as_list
function (x) 
{
    lx = vector("list", 1L)
    .Call(Csetlistelt, lx, 1L, x)
    lx
}
<bytecode: 0x55555bc452a8>
<environment: namespace:data.table>
Browse[2]> n
debug: if (is.null(by)) by = seq_along(x)
Browse[2]> n
debug: seq_along(x)
Browse[2]> n
debug: o = forderv(x, by = by, retGrp = TRUE, na.last = if (!na.rm) FALSE else NA)
Browse[2]> n
debug: [1] FALSE
Browse[2]> n
debug: starts = attr(o, "starts")
Browse[2]> n
debug: if (!na.rm) {
    length(starts)
} else {
    sum((if (length(o)) o[starts] else starts) != 0L)
}
Browse[2]> n
debug: length(starts)
Browse[2]> n
exiting from: uniqueN(day)

Test function for Intel VTune:

library(data.table)

N_X <- 1e6
n_day <- 60
n_clientid <- 1e5
n_Platform <- 7
X <- data.table(day = sample(1:n_day, N_X, TRUE),
                clientid = as.character(sample(1:n_clientid, N_X, TRUE)),
                Platform = as.character(sample(1:n_Platform, N_X, TRUE)))

setDTthreads(parallel::detectCores())

results <- list()
for (i in seq_len(5)) {
  cat(sprintf(paste0("%0", floor(log10(i) + 1), "d"), i), ": ", sep = "")
  results[[i]] <- system.time(X[, .(x = uniqueN(day) - 1L,
                                    first_active_day = min(day),
                                    last_active_day = max(day)),
                                by = .(clientid)])
  cat("[user=", sprintf("%06.03f", results[[i]][1]),
      ", system=", sprintf("%06.03f", results[[i]][2]),
      ", elapsed=", sprintf("%06.03f", results[[i]][3]), "]\n", sep = "")
}
Was this page helpful?
0 / 5 - 0 ratings