Sparklyr: Collapse text by group in sparklyr data frame

Created on 7 Jun 2017  路  4Comments  路  Source: sparklyr/sparklyr

I am trying to use the group_by() and mutate() functions in sparklyr to concatenate rows in a group.
Here is a simple example that I think should work but doesn't:

library(sparkylr)
d <- data.frame(id=c("1", "1", "2", "2", "1", "2"), 
             x=c("200", "200", "200", "201", "201", "201"), 
             y=c("This", "That", "The", "Other", "End", "End"))
d_sdf <- copy_to(sc, d, "d")
d_sdf %>% group_by(id, x) %>% mutate( y = paste(y, collapse = " "))

What I'd like it to produce is:

Source: local data frame [6 x 3]
Groups: id, x [4]

# A tibble: 6 x 3
      id      x         y
  <fctr> <fctr>     <chr>
1      1    200 This That
2      1    200 This That
3      2    200       The
4      2    201 Other End
5      1    201       End
6      2    201 Other End

I get the following error:

Error: org.apache.spark.sql.AnalysisException: missing ) at 'AS' near '' '' in selection target; line 1 pos 42
utils::sessionInfo()
R version 3.2.3 (2015-12-10)
Platform: x86_64-redhat-linux-gnu (64-bit)
Running under: Red Hat Enterprise Linux Server release 6.8 (Santiago)

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8     LC_MONETARY=en_US.UTF-8   
 [6] LC_MESSAGES=en_US.UTF-8    LC_PAPER=en_US.UTF-8       LC_NAME=C                  LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] dplyr_0.5.0    sparklyr_0.5.4

loaded via a namespace (and not attached):
 [1] Rcpp_0.12.10     withr_1.0.2      digest_0.6.12    rprojroot_1.2    assertthat_0.2.0 R6_2.2.1         jsonlite_1.4     DBI_0.6-1        backports_1.0.5 
[10] magrittr_1.5     httr_1.2.1       rlang_0.1.1      lazyeval_0.2.0   config_0.2       tools_3.2.3      readr_1.1.1      hms_0.3          parallel_3.2.3  
[19] yaml_2.1.14      base64enc_0.1-3  tibble_1.3.1    
distributed r dplyr featurerequest

Most helpful comment

Here's a way to make it work:

d_sdf %>%
  group_by(id, x) %>%
  summarize(y = paste(collect_list(y), sep = " ")) %>%
  right_join(select(d_sdf, -y), by = c("id", "x"))
# Source:   lazy query [?? x 3]
# Database: spark_connection
# Groups:   id
id     x         y
<chr> <chr>     <chr>
1     1   200 This That
2     1   200 This That
3     2   200       The
4     2   201 Other End
5     1   201       End
6     2   201 Other End

If you try group_by-mutate Spark SQL complains and gives something like this:

d_sdf %>%
  group_by(id, x) %>%
  mutate(y = paste(collect_list(y), sep = " "))
Error: org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and 'd.`id`' is not an aggregate function. Wrap '(concat_ws(' ', collect_list(d.`y`)) AS `y`)' in windowing function(s) or wrap 'd.`id`' in first() (or first_value) if you don't care which value you get.;;

All 4 comments

Given this code:

library(dplyr)
library(dbplyr)
library(sparklyr)

sc <- spark_connect(master = "local", version = "2.1.0")
d <- data.frame(
   id = c("1", "1", "2", "2", "1", "2"),
   x = c("200", "200", "200", "201", "201", "201"),
   y = c("This", "That", "The", "Other", "End", "End")
)
d_sdf <- copy_to(sc, d, "d")

d_sdf %>%
   group_by(id, x) %>%
   mutate(y = paste(y, collapse = " ")) %>%
   sql_render()

I see the following SQL generated:

> d_sdf %>%
+    group_by(id, x) %>%
+    mutate(y = paste(y, collapse = " ")) %>%
+    sql_render()
<SQL> SELECT `id`, `x`, CONCAT_WS(" ", `y`, " " AS "collapse") AS `y`
FROM `d`

Perhaps an issue with our paste() implementation here?

https://github.com/rstudio/sparklyr/blob/d63a97b4269d7cc1a26a101611522141df7488ba/R/dplyr_spark_connection.R#L65

Here's a way to make it work:

d_sdf %>%
  group_by(id, x) %>%
  summarize(y = paste(collect_list(y), sep = " ")) %>%
  right_join(select(d_sdf, -y), by = c("id", "x"))
# Source:   lazy query [?? x 3]
# Database: spark_connection
# Groups:   id
id     x         y
<chr> <chr>     <chr>
1     1   200 This That
2     1   200 This That
3     2   200       The
4     2   201 Other End
5     1   201       End
6     2   201 Other End

If you try group_by-mutate Spark SQL complains and gives something like this:

d_sdf %>%
  group_by(id, x) %>%
  mutate(y = paste(collect_list(y), sep = " "))
Error: org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and 'd.`id`' is not an aggregate function. Wrap '(concat_ws(' ', collect_list(d.`y`)) AS `y`)' in windowing function(s) or wrap 'd.`id`' in first() (or first_value) if you don't care which value you get.;;

That works fine. Thanks.

How to sort the output of each collect_list?

Was this page helpful?
0 / 5 - 0 ratings