Hi
I'm trying to debug some code I'm trying to run using spark_apply. When it runs, I get an error, and I think I've managed to isolate the behaviour that causes it.
In short, the code contains an evaluation of the number of unique values in a data frame column. If there are fewer than 2 unique values, it should return an error. Otherwise, it should return a value. The evaluation happens OK, and the result of the evaluation can be returned fine. But if the result of the evaluation is used in an if() statement, there's an error, even if the condition is not met. This only seems to happen if the first 10 values in the column are the same - any kind of mixing of the first 10 values makes it work OK. Specifying a column type doesn't seem to help. Assigning the outcome of the evaluation to an object and then using that object in the if() statement doesn't seem to help. The error is reproducible in Spark 2.2.0 and Spark 2.4.3, and with R 3.5.1 and R 3.5.3.
It wouldn't be possible to shuffle data to ensure there weren't 10 of the same cases in a row in the real function, so I'm stuck now.
Any ideas about how to deal with this, or any idea of what is going on, would be really appreciated!
Reproducible example as follows:
library(sparklyr)
Sys.setenv(SPARK_HOME = "/usr/hdp/current/spark2-client")
sc <- spark_connect(master = "local")
## define functions
# function 1 counts n unique, evalutes whether < 2, returns result
countfunc_works <- function(x){
n_unique <- length(unique(x[,1]))
return(n_unique<2)
}
# function 2 counts n unique, returns n_unique-1 if < 2, else returns n unique
# note in full code, would have a stop() statement instead of return(n_unique-1)
countfunc_fails <- function(x){
n_unique <- length(unique(x[,1]))
if(n_unique<2){
return(n_unique-1)
}
return(n_unique)
}
## define data
# data with 10 zeros and 1 one
x1 <- c(rep(0, 10), 1)
exdat1 <- data.frame("x"=x1)
exdat1_spark <- copy_to(sc, exdat1)
# data with 9 zeros, 1 one, and 1 zero
x2 <- c(rep(0, 9), 1, 0)
exdat2 <- data.frame("x"=x2)
exdat2_spark <-copy_to(sc, exdat2)
## run functions
# both work, condition evaluates as FALSE, returns value
spark_apply(exdat1_spark, countfunc_works)
spark_apply(exdat2_spark, countfunc_works)
# exdat1 fails, exdat2 works
spark_apply(exdat1_spark, countfunc_fails)
spark_apply(exdat2_spark, countfunc_fails)
Session info:
R version 3.5.1 (2018-07-02)
Platform: x86_64-redhat-linux-gnu (64-bit)
Running under: Red Hat Enterprise Linux Server 7.7 (Maipo)
Matrix products: default
BLAS/LAPACK: /usr/lib64/R/lib/libRblas.so
locale:
[1] LC_CTYPE=en_GB.UTF-8 LC_NUMERIC=C
[3] LC_TIME=en_GB.UTF-8 LC_COLLATE=en_GB.UTF-8
[5] LC_MONETARY=en_GB.UTF-8 LC_MESSAGES=en_GB.UTF-8
[7] LC_PAPER=en_GB.UTF-8 LC_NAME=C
[9] LC_ADDRESS=C LC_TELEPHONE=C
[11] LC_MEASUREMENT=en_GB.UTF-8 LC_IDENTIFICATION=C
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] sparklyr_1.1.0
loaded via a namespace (and not attached):
[1] Rcpp_1.0.3 rstudioapi_0.10 magrittr_1.5 tidyselect_0.2.5
[5] R6_2.4.0 rlang_0.4.2 fansi_0.4.0 httr_1.4.0
[9] dplyr_0.8.3 tools_3.5.1 parallel_3.5.1 config_0.3
[13] utf8_1.1.4 cli_1.0.1 DBI_1.0.0 withr_2.1.2
[17] dbplyr_1.3.0 askpass_1.1 htmltools_0.3.6 ellipsis_0.3.0
[21] openssl_1.2.1 yaml_2.2.0 assertthat_0.2.0 rprojroot_1.3-2
[25] digest_0.6.18 tibble_2.0.1 forge_0.1.0 crayon_1.3.4
[29] purrr_0.3.0 base64enc_0.1-3 htmlwidgets_1.5.1 glue_1.3.0
[33] compiler_3.5.1 pillar_1.3.1 r2d3_0.2.3 generics_0.0.2
[37] backports_1.1.3 jsonlite_1.6 pkgconfig_2.0.2
Hi mjcarroll,
I found your error interesting and decided to look into it. Indeed as you say, the second function fails (countfunc_fails):
> spark_apply(exdat1_spark, countfunc_fails)
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 48.0 failed 1 times, most recent failure: Lost task 0.0 in stage 48.0 (TID 46, localhost, executor driver): java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of double
The error message indicates a schema mismatch issue between Java and R. So I rewrote countfunc_fails to countfunc_new, as shown below. And it runs.
> # define modified countfunc_fails as countfunc_new
> countfunc_new <- function(x) {
+ n_unique <- length(unique(x[,1]))
+ if(n_unique<2){
+ return(as.numeric(n_unique-1))
+ }
+ return(as.numeric(n_unique))
+ }
> spark_apply(exdat1_spark, countfunc_new)
# Source: spark<?> [?? x 1]
result
<dbl>
1 2
> spark_apply(exdat2_spark, countfunc_new)
# Source: spark<?> [?? x 1]
result
<dbl>
1 2
This is strange, as the only difference between the two functions is variable type:
> class(countfunc_fails(exdat1))
[1] "integer"
> class(countfunc_new(exdat1))
[1] "numeric"
This seems like a type conversion problem between the JVM and R. A superficial look into the Spark API guide indicates that R integers map to Spark integers, while R numerics map to Spark doubles. So R integers should be supported. Unfortunately this is where my capacity to investigate ends. I am not a Java dev.
Best Regards,
Ryan
Thanks rychien-official
That explanation is very helpful - thank you for your time on it!
With a bit of experimentation, we found that increasing the config$sparklyr.apply.schema.infer
value to more than the default 10 also made countfunc_fails()
work, although that feels like a workaround whereas you've got closer to the root of the problem!
On top of what @rychien-official pointed out already, I'd also like to mention the best practice (aside from modifying countfunc_fails
to make both branches return the same data type) is to explicitly specify the expected schema of your result via the column
parameter (e.g., spark_apply(exdat1_spark, countfunc_fails, columns = list(count = "integer"))
would have also worked) if you foresee any potential ambiguity from schema inference based on a small subset of your result.
Most helpful comment
Hi mjcarroll,
I found your error interesting and decided to look into it. Indeed as you say, the second function fails (countfunc_fails):
The error message indicates a schema mismatch issue between Java and R. So I rewrote countfunc_fails to countfunc_new, as shown below. And it runs.
This is strange, as the only difference between the two functions is variable type:
This seems like a type conversion problem between the JVM and R. A superficial look into the Spark API guide indicates that R integers map to Spark integers, while R numerics map to Spark doubles. So R integers should be supported. Unfortunately this is where my capacity to investigate ends. I am not a Java dev.
Best Regards,
Ryan