Sparklyr: Problem with functions

Created on 25 Dec 2018  路  3Comments  路  Source: sparklyr/sparklyr

I have a question: why does not the same function work?

dplyr::ntile

function (x, n) 
{
    len <- sum(!is.na(x))
    if (len == 0L) {
        rep(NA_integer_, length(x))
    }
    else {
        as.integer(floor(n * (row_number(x) - 1)/len + 1))
    }
}
<environment: namespace:dplyr>

and i create the same function

myFunction = function (x, n) 
{
  len <- sum(!is.na(x))
  if (len == 0L) {
    rep(NA_integer_, length(x))
  }
  else {
    as.integer(floor(n * (row_number(x) - 1)/len + 1))
  }
}

it works

df %>% select(vars) %>% mutate_at(vars, funs(ntile(., n = 5)))

doens't work

df %>% select(vars) %>% mutate_at(vars, funs(myFunction (., n = 5)))

```

Could someone answer me? How could I create functions that works? Using spark_apply? I have some examples and I guess they are not so fast using spark_apply.

dplyr question

All 3 comments

ntile is implemented in Spark itself, so sparklyr does not make use of the dplyr ntile function definition.

Additionally, I wasn't able to reproduce this issue, see bellow.

library(sparklyr)
library(dplyr)

sc < - spark_connect(master = "local")
cars <- copy_to(sc, mtcars)
cars %>% mutate_at(c("mpg", "hp"), funs(ntile(., n = 5)))
# Source: spark<?> [?? x 11]
     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
 * <int> <dbl> <dbl> <int> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
 1     1     8  472      4  2.93  5.25  18.0     0     0     3     4
 2     1     8  460      5  3     5.42  17.8     0     0     3     4
 3     1     8  350      5  3.73  3.84  15.4     0     0     3     4
 4     1     8  360      5  3.21  3.57  15.8     0     0     3     4
 5     1     8  440      5  3.23  5.34  17.4     0     0     3     4
 6     1     8  301      5  3.54  3.57  14.6     0     1     5     8
 7     1     8  304      3  3.15  3.44  17.3     0     0     3     2
 8     2     8  276.     4  3.07  3.78  18       0     0     3     3
 9     2     8  318      3  2.76  3.52  16.9     0     0     3     2
10     2     8  351      5  4.22  3.17  14.5     0     1     5     4
# ... with more rows

Notice that one can compose functions that make use of dataframes without having to use spark_apply().

ntile is implemented in Spark itself, so sparklyr does not make use of the dplyr ntile function definition.

Additionally, I wasn't able to reproduce this issue, see bellow.

library(sparklyr)
library(dplyr)

sc < - spark_connect(master = "local")
cars <- copy_to(sc, mtcars)
cars %>% mutate_at(c("mpg", "hp"), funs(ntile(., n = 5)))
# Source: spark<?> [?? x 11]
     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
 * <int> <dbl> <dbl> <int> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
 1     1     8  472      4  2.93  5.25  18.0     0     0     3     4
 2     1     8  460      5  3     5.42  17.8     0     0     3     4
 3     1     8  350      5  3.73  3.84  15.4     0     0     3     4
 4     1     8  360      5  3.21  3.57  15.8     0     0     3     4
 5     1     8  440      5  3.23  5.34  17.4     0     0     3     4
 6     1     8  301      5  3.54  3.57  14.6     0     1     5     8
 7     1     8  304      3  3.15  3.44  17.3     0     0     3     2
 8     2     8  276.     4  3.07  3.78  18       0     0     3     3
 9     2     8  318      3  2.76  3.52  16.9     0     0     3     2
10     2     8  351      5  4.22  3.17  14.5     0     1     5     4
# ... with more rows

Notice that one can compose functions that make use of dataframes without having to use spark_apply().

I understood! Thank you for your explanation!

I have questions about creating functions without using spark_apply.

Can you show me some examples?

I have questions about creating functions without using spark_apply.
Can you show me some examples?

That's a little vague, but in general if you're trying to write functions for working with Spark data frames without going through spark_apply() (or trying to do anything with sparklyr really) you need to be aware of the differences between tbl_spark and data.frame objects. Primarily, the R object you're saving as a tbl_spark is really an object linking to your Spark context rather than a data set on its own.

library(dplyr)
library(sparklyr)
sc <- spark_connect(master = "local")

#creating spark version of iris data set
iris_spark <- copy_to(dest = sc, df = iris, name = "iris_tbl")

If we just type iris_spark and hit enter/return then we see the first couple rows of the data:

# Source: spark<iris_tbl> [?? x 5]
   Sepal_Length Sepal_Width Petal_Length Petal_Width Species
 *        <dbl>       <dbl>        <dbl>       <dbl> <chr>  
 1          5.1         3.5          1.4         0.2 setosa 
 2          4.9         3            1.4         0.2 setosa 
 3          4.7         3.2          1.3         0.2 setosa 
 4          4.6         3.1          1.5         0.2 setosa 
 5          5           3.6          1.4         0.2 setosa 
 6          5.4         3.9          1.7         0.4 setosa 
 7          4.6         3.4          1.4         0.3 setosa 
 8          5           3.4          1.5         0.2 setosa 
 9          4.4         2.9          1.4         0.2 setosa 
10          4.9         3.1          1.5         0.1 setosa 
# ... with more rows

But we really need to look at str() outputs here.

Output of str(iris):

'data.frame':   150 obs. of  5 variables:
 $ Sepal.Length: num  5.1 4.9 4.7 4.6 5 5.4 4.6 5 4.4 4.9 ...
 $ Sepal.Width : num  3.5 3 3.2 3.1 3.6 3.9 3.4 3.4 2.9 3.1 ...
 $ Petal.Length: num  1.4 1.4 1.3 1.5 1.4 1.7 1.4 1.5 1.4 1.5 ...
 $ Petal.Width : num  0.2 0.2 0.2 0.2 0.2 0.4 0.3 0.2 0.2 0.1 ...
 $ Species     : Factor w/ 3 levels "setosa","versicolor",..: 1 1 1 1 1 1 1 1 1 1 ...

Compare this with the output of str(iris_spark):

List of 2
 $ src:List of 1
  ..$ con:List of 11
  .. ..$ master     : chr "local[4]"
  .. ..$ method     : chr "shell"
  .. ..$ app_name   : chr "sparklyr"
  .. ..$ config     :List of 5
  .. .. ..$ spark.env.SPARK_LOCAL_IP.local    : chr "127.0.0.1"
  .. .. ..$ sparklyr.connect.csv.embedded     : chr "^1.*"
  .. .. ..$ spark.sql.catalogImplementation   : chr "hive"
  .. .. ..$ sparklyr.connect.cores.local      : int 4
  .. .. ..$ spark.sql.shuffle.partitions.local: int 4
  .. .. ..- attr(*, "config")= chr "default"
  .. .. ..- attr(*, "file")= chr "C:\\Users\\benjamin.white\\Documents\\R\\win-library\\3.5\\sparklyr\\conf\\config-template.yml"
  .. ..$ state      :<environment: 0x0000000017fd46f0> 
  .. ..$ spark_home : chr "C:\\Users\\benjamin.white\\AppData\\Local\\spark\\spark-2.3.0-bin-hadoop2.7"
  .. ..$ backend    : 'sockconn' int 4
  .. .. ..- attr(*, "conn_id")=<externalptr> 
  .. ..$ monitoring : 'sockconn' int 5
  .. .. ..- attr(*, "conn_id")=<externalptr> 
  .. ..$ gateway    : 'sockconn' int 3
  .. .. ..- attr(*, "conn_id")=<externalptr> 
  .. ..$ output_file: chr "C:\\Users\\BENJAM~1.WHI\\AppData\\Local\\Temp\\RtmpsjFRR4\\file20943237586a_spark.log"
  .. ..$ sessionId  : num 18013
  .. ..- attr(*, "class")= chr [1:3] "spark_connection" "spark_shell_connection" "DBIConnection"
  ..- attr(*, "class")= chr [1:3] "src_spark" "src_sql" "src"
 $ ops:List of 2
  ..$ x   : 'ident' chr "iris_tbl"
  ..$ vars: chr [1:5] "Sepal_Length" "Sepal_Width" "Petal_Length" "Petal_Width" ...
  ..- attr(*, "class")= chr [1:3] "op_base_remote" "op_base" "op"
 - attr(*, "class")= chr [1:4] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"

R "sees" this object as a list of length 2. So if we try to pull out rows, columns, or elements with syntax like iris_spark[2] or something similar then R will think we're trying perform those operations on this nested list, if we try to use names() then it will return "src" and "ops" rather than the column names from iris, and so on.

You need to be using functions and methods meant to work specifically with objects with the class tbl_spark. For instance names() thinks you want the names of the list object, but colnames() has a method for this object and will return what you're really after. The dplyr verbs actually do different things under the hood depending on what kind of object you feed them, but they're written so that the data manipulation abstractions work the same way regardless of the storage structure or format of your data. So iris %>% filter(Species == "setosa") and iris_spark %>% filter(Species == "setosa") will return the same information, albeit as two different types of R objects.

In general you should try to use the sdf_* family of functions from sparklyr rather than the usual go-to base R functions for working with local data frames, keep your tbl_sparks and data.frames straight in your environment, and read through the Spark SQL function documentation. sparklyr versions of the dplyr verbs can use every function in that list, and this is has been a lifesaver for me on several occasions.

For some "inspiration" you can check out this SO post with some examples of quick and dirty implementations of tidyr::gather() written for Spark tables.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

GIS
javierluraschi picture javierluraschi  路  4Comments

Scotturbina picture Scotturbina  路  3Comments

hanfernandes picture hanfernandes  路  3Comments

isomorphisms picture isomorphisms  路  3Comments

javierluraschi picture javierluraschi  路  4Comments