I have a question: why does not the same function work?
dplyr::ntile
function (x, n)
{
len <- sum(!is.na(x))
if (len == 0L) {
rep(NA_integer_, length(x))
}
else {
as.integer(floor(n * (row_number(x) - 1)/len + 1))
}
}
<environment: namespace:dplyr>
and i create the same function
myFunction = function (x, n)
{
len <- sum(!is.na(x))
if (len == 0L) {
rep(NA_integer_, length(x))
}
else {
as.integer(floor(n * (row_number(x) - 1)/len + 1))
}
}
it works
df %>% select(vars) %>% mutate_at(vars, funs(ntile(., n = 5)))
doens't work
df %>% select(vars) %>% mutate_at(vars, funs(myFunction (., n = 5)))
```
Could someone answer me? How could I create functions that works? Using spark_apply? I have some examples and I guess they are not so fast using spark_apply.
ntile
is implemented in Spark itself, so sparklyr
does not make use of the dplyr
ntile
function definition.
Additionally, I wasn't able to reproduce this issue, see bellow.
library(sparklyr)
library(dplyr)
sc < - spark_connect(master = "local")
cars <- copy_to(sc, mtcars)
cars %>% mutate_at(c("mpg", "hp"), funs(ntile(., n = 5)))
# Source: spark<?> [?? x 11]
mpg cyl disp hp drat wt qsec vs am gear carb
* <int> <dbl> <dbl> <int> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 1 8 472 4 2.93 5.25 18.0 0 0 3 4
2 1 8 460 5 3 5.42 17.8 0 0 3 4
3 1 8 350 5 3.73 3.84 15.4 0 0 3 4
4 1 8 360 5 3.21 3.57 15.8 0 0 3 4
5 1 8 440 5 3.23 5.34 17.4 0 0 3 4
6 1 8 301 5 3.54 3.57 14.6 0 1 5 8
7 1 8 304 3 3.15 3.44 17.3 0 0 3 2
8 2 8 276. 4 3.07 3.78 18 0 0 3 3
9 2 8 318 3 2.76 3.52 16.9 0 0 3 2
10 2 8 351 5 4.22 3.17 14.5 0 1 5 4
# ... with more rows
Notice that one can compose functions that make use of dataframes without having to use spark_apply()
.
ntile
is implemented in Spark itself, sosparklyr
does not make use of thedplyr
ntile
function definition.Additionally, I wasn't able to reproduce this issue, see bellow.
library(sparklyr) library(dplyr) sc < - spark_connect(master = "local") cars <- copy_to(sc, mtcars) cars %>% mutate_at(c("mpg", "hp"), funs(ntile(., n = 5)))
# Source: spark<?> [?? x 11] mpg cyl disp hp drat wt qsec vs am gear carb * <int> <dbl> <dbl> <int> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> 1 1 8 472 4 2.93 5.25 18.0 0 0 3 4 2 1 8 460 5 3 5.42 17.8 0 0 3 4 3 1 8 350 5 3.73 3.84 15.4 0 0 3 4 4 1 8 360 5 3.21 3.57 15.8 0 0 3 4 5 1 8 440 5 3.23 5.34 17.4 0 0 3 4 6 1 8 301 5 3.54 3.57 14.6 0 1 5 8 7 1 8 304 3 3.15 3.44 17.3 0 0 3 2 8 2 8 276. 4 3.07 3.78 18 0 0 3 3 9 2 8 318 3 2.76 3.52 16.9 0 0 3 2 10 2 8 351 5 4.22 3.17 14.5 0 1 5 4 # ... with more rows
Notice that one can compose functions that make use of dataframes without having to use
spark_apply()
.
I understood! Thank you for your explanation!
I have questions about creating functions without using spark_apply.
Can you show me some examples?
I have questions about creating functions without using spark_apply.
Can you show me some examples?
That's a little vague, but in general if you're trying to write functions for working with Spark data frames without going through spark_apply()
(or trying to do anything with sparklyr
really) you need to be aware of the differences between tbl_spark
and data.frame
objects. Primarily, the R object you're saving as a tbl_spark
is really an object linking to your Spark context rather than a data set on its own.
library(dplyr)
library(sparklyr)
sc <- spark_connect(master = "local")
#creating spark version of iris data set
iris_spark <- copy_to(dest = sc, df = iris, name = "iris_tbl")
If we just type iris_spark
and hit enter/return then we see the first couple rows of the data:
# Source: spark<iris_tbl> [?? x 5]
Sepal_Length Sepal_Width Petal_Length Petal_Width Species
* <dbl> <dbl> <dbl> <dbl> <chr>
1 5.1 3.5 1.4 0.2 setosa
2 4.9 3 1.4 0.2 setosa
3 4.7 3.2 1.3 0.2 setosa
4 4.6 3.1 1.5 0.2 setosa
5 5 3.6 1.4 0.2 setosa
6 5.4 3.9 1.7 0.4 setosa
7 4.6 3.4 1.4 0.3 setosa
8 5 3.4 1.5 0.2 setosa
9 4.4 2.9 1.4 0.2 setosa
10 4.9 3.1 1.5 0.1 setosa
# ... with more rows
But we really need to look at str()
outputs here.
Output of str(iris)
:
'data.frame': 150 obs. of 5 variables:
$ Sepal.Length: num 5.1 4.9 4.7 4.6 5 5.4 4.6 5 4.4 4.9 ...
$ Sepal.Width : num 3.5 3 3.2 3.1 3.6 3.9 3.4 3.4 2.9 3.1 ...
$ Petal.Length: num 1.4 1.4 1.3 1.5 1.4 1.7 1.4 1.5 1.4 1.5 ...
$ Petal.Width : num 0.2 0.2 0.2 0.2 0.2 0.4 0.3 0.2 0.2 0.1 ...
$ Species : Factor w/ 3 levels "setosa","versicolor",..: 1 1 1 1 1 1 1 1 1 1 ...
Compare this with the output of str(iris_spark)
:
List of 2
$ src:List of 1
..$ con:List of 11
.. ..$ master : chr "local[4]"
.. ..$ method : chr "shell"
.. ..$ app_name : chr "sparklyr"
.. ..$ config :List of 5
.. .. ..$ spark.env.SPARK_LOCAL_IP.local : chr "127.0.0.1"
.. .. ..$ sparklyr.connect.csv.embedded : chr "^1.*"
.. .. ..$ spark.sql.catalogImplementation : chr "hive"
.. .. ..$ sparklyr.connect.cores.local : int 4
.. .. ..$ spark.sql.shuffle.partitions.local: int 4
.. .. ..- attr(*, "config")= chr "default"
.. .. ..- attr(*, "file")= chr "C:\\Users\\benjamin.white\\Documents\\R\\win-library\\3.5\\sparklyr\\conf\\config-template.yml"
.. ..$ state :<environment: 0x0000000017fd46f0>
.. ..$ spark_home : chr "C:\\Users\\benjamin.white\\AppData\\Local\\spark\\spark-2.3.0-bin-hadoop2.7"
.. ..$ backend : 'sockconn' int 4
.. .. ..- attr(*, "conn_id")=<externalptr>
.. ..$ monitoring : 'sockconn' int 5
.. .. ..- attr(*, "conn_id")=<externalptr>
.. ..$ gateway : 'sockconn' int 3
.. .. ..- attr(*, "conn_id")=<externalptr>
.. ..$ output_file: chr "C:\\Users\\BENJAM~1.WHI\\AppData\\Local\\Temp\\RtmpsjFRR4\\file20943237586a_spark.log"
.. ..$ sessionId : num 18013
.. ..- attr(*, "class")= chr [1:3] "spark_connection" "spark_shell_connection" "DBIConnection"
..- attr(*, "class")= chr [1:3] "src_spark" "src_sql" "src"
$ ops:List of 2
..$ x : 'ident' chr "iris_tbl"
..$ vars: chr [1:5] "Sepal_Length" "Sepal_Width" "Petal_Length" "Petal_Width" ...
..- attr(*, "class")= chr [1:3] "op_base_remote" "op_base" "op"
- attr(*, "class")= chr [1:4] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
R "sees" this object as a list of length 2. So if we try to pull out rows, columns, or elements with syntax like iris_spark[2]
or something similar then R will think we're trying perform those operations on this nested list, if we try to use names()
then it will return "src" and "ops" rather than the column names from iris
, and so on.
You need to be using functions and methods meant to work specifically with objects with the class tbl_spark
. For instance names()
thinks you want the names of the list object, but colnames()
has a method for this object and will return what you're really after. The dplyr
verbs actually do different things under the hood depending on what kind of object you feed them, but they're written so that the data manipulation abstractions work the same way regardless of the storage structure or format of your data. So iris %>% filter(Species == "setosa")
and iris_spark %>% filter(Species == "setosa")
will return the same information, albeit as two different types of R objects.
In general you should try to use the sdf_*
family of functions from sparklyr
rather than the usual go-to base R functions for working with local data frames, keep your tbl_spark
s and data.frame
s straight in your environment, and read through the Spark SQL function documentation. sparklyr
versions of the dplyr
verbs can use every function in that list, and this is has been a lifesaver for me on several occasions.
For some "inspiration" you can check out this SO post with some examples of quick and dirty implementations of tidyr::gather()
written for Spark tables.