Repository: spark
Updated Branches:
  refs/heads/master 57e1da394 -> df58a95a3


[SPARK-20437][R] R wrappers for rollup and cube

## What changes were proposed in this pull request?

- Add `rollup` and `cube` methods and corresponding generics.
- Add short description to the vignette.

## How was this patch tested?

- Existing unit tests.
- Additional unit tests covering new features.
- `check-cran.sh`.

Author: zero323 <zero...@users.noreply.github.com>

Closes #17728 from zero323/SPARK-20437.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df58a95a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df58a95a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df58a95a

Branch: refs/heads/master
Commit: df58a95a33b739462dbe84e098839af2a8643d45
Parents: 57e1da3
Author: zero323 <zero...@users.noreply.github.com>
Authored: Tue Apr 25 22:00:45 2017 -0700
Committer: Felix Cheung <felixche...@apache.org>
Committed: Tue Apr 25 22:00:45 2017 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |   2 +
 R/pkg/R/DataFrame.R                       |  73 +++++++++++++++++-
 R/pkg/R/generics.R                        |   8 ++
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 102 +++++++++++++++++++++++++
 R/pkg/vignettes/sparkr-vignettes.Rmd      |  15 ++++
 docs/sparkr.md                            |  30 ++++++++
 6 files changed, 229 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/df58a95a/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 95d5cc6..2800461 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -101,6 +101,7 @@ exportMethods("arrange",
               "createOrReplaceTempView",
               "crossJoin",
               "crosstab",
+              "cube",
               "dapply",
               "dapplyCollect",
               "describe",
@@ -143,6 +144,7 @@ exportMethods("arrange",
               "registerTempTable",
               "rename",
               "repartition",
+              "rollup",
               "sample",
               "sample_frac",
               "sampleBy",

http://git-wip-us.apache.org/repos/asf/spark/blob/df58a95a/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 88a138f..cd6f03a 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1321,7 +1321,7 @@ setMethod("toRDD",
 #' Groups the SparkDataFrame using the specified columns, so we can run 
aggregation on them.
 #'
 #' @param x a SparkDataFrame.
-#' @param ... variable(s) (character names(s) or Column(s)) to group on.
+#' @param ... character name(s) or Column(s) to group on.
 #' @return A GroupedData.
 #' @family SparkDataFrame functions
 #' @aliases groupBy,SparkDataFrame-method
@@ -1337,6 +1337,7 @@ setMethod("toRDD",
 #'   agg(groupBy(df, "department", "gender"), salary="avg", "age" -> "max")
 #' }
 #' @note groupBy since 1.4.0
+#' @seealso \link{agg}, \link{cube}, \link{rollup}
 setMethod("groupBy",
            signature(x = "SparkDataFrame"),
            function(x, ...) {
@@ -3642,3 +3643,73 @@ setMethod("checkpoint",
             df <- callJMethod(x@sdf, "checkpoint", as.logical(eager))
             dataFrame(df)
           })
+
+#' cube
+#'
+#' Create a multi-dimensional cube for the SparkDataFrame using the specified 
columns.
+#'
+#' If grouping expression is missing \code{cube} creates a single global 
aggregate and is equivalent to
+#' direct application of \link{agg}.
+#'
+#' @param x a SparkDataFrame.
+#' @param ... character name(s) or Column(s) to group on.
+#' @return A GroupedData.
+#' @family SparkDataFrame functions
+#' @aliases cube,SparkDataFrame-method
+#' @rdname cube
+#' @name cube
+#' @export
+#' @examples
+#' \dontrun{
+#' df <- createDataFrame(mtcars)
+#' mean(cube(df, "cyl", "gear", "am"), "mpg")
+#'
+#' # Following calls are equivalent
+#' agg(cube(carsDF), mean(carsDF$mpg))
+#' agg(carsDF, mean(carsDF$mpg))
+#' }
+#' @note cube since 2.3.0
+#' @seealso \link{agg}, \link{groupBy}, \link{rollup}
+setMethod("cube",
+          signature(x = "SparkDataFrame"),
+          function(x, ...) {
+            cols <- list(...)
+            jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc 
else column(x)@jc)
+            sgd <- callJMethod(x@sdf, "cube", jcol)
+            groupedData(sgd)
+          })
+
+#' rollup
+#'
+#' Create a multi-dimensional rollup for the SparkDataFrame using the 
specified columns.
+#'
+#' If grouping expression is missing \code{rollup} creates a single global 
aggregate and is equivalent to
+#' direct application of \link{agg}.
+#'
+#' @param x a SparkDataFrame.
+#' @param ... character name(s) or Column(s) to group on.
+#' @return A GroupedData.
+#' @family SparkDataFrame functions
+#' @aliases rollup,SparkDataFrame-method
+#' @rdname rollup
+#' @name rollup
+#' @export
+#' @examples
+#'\dontrun{
+#' df <- createDataFrame(mtcars)
+#' mean(rollup(df, "cyl", "gear", "am"), "mpg")
+#'
+#' # Following calls are equivalent
+#' agg(rollup(carsDF), mean(carsDF$mpg))
+#' agg(carsDF, mean(carsDF$mpg))
+#' }
+#' @note rollup since 2.3.0
+#' @seealso \link{agg}, \link{cube}, \link{groupBy}
+setMethod("rollup",
+          signature(x = "SparkDataFrame"),
+          function(x, ...) {
+            cols <- list(...)
+            jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc 
else column(x)@jc)
+            sgd <- callJMethod(x@sdf, "rollup", jcol)
+            groupedData(sgd)
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/df58a95a/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 5e7a1c6..749ee9b 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -483,6 +483,10 @@ setGeneric("createOrReplaceTempView",
 # @export
 setGeneric("crossJoin", function(x, y) { standardGeneric("crossJoin") })
 
+#' @rdname cube
+#' @export
+setGeneric("cube", function(x, ...) { standardGeneric("cube") })
+
 #' @rdname dapply
 #' @export
 setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") })
@@ -631,6 +635,10 @@ setGeneric("sample",
              standardGeneric("sample")
            })
 
+#' @rdname rollup
+#' @export
+setGeneric("rollup", function(x, ...) { standardGeneric("rollup") })
+
 #' @rdname sample
 #' @export
 setGeneric("sample_frac",

http://git-wip-us.apache.org/repos/asf/spark/blob/df58a95a/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index c21ba2f..2cef719 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1816,6 +1816,108 @@ test_that("pivot GroupedData column", {
   expect_error(collect(sum(pivot(groupBy(df, "year"), "course", list("R", 
"R")), "earnings")))
 })
 
+test_that("test multi-dimensional aggregations with cube and rollup", {
+  df <- createDataFrame(data.frame(
+    id = 1:6,
+    year = c(2016, 2016, 2016, 2017, 2017, 2017),
+    salary = c(10000, 15000, 20000, 22000, 32000, 21000),
+    department = c("management", "rnd", "sales", "management", "rnd", "sales")
+  ))
+
+  actual_cube <- collect(
+    orderBy(
+      agg(
+        cube(df, "year", "department"),
+        expr("sum(salary) AS total_salary"), expr("avg(salary) AS 
average_salary")
+      ),
+      "year", "department"
+    )
+  )
+
+  expected_cube <- data.frame(
+    year = c(rep(NA, 4), rep(2016, 4), rep(2017, 4)),
+    department = rep(c(NA, "management", "rnd", "sales"), times = 3),
+    total_salary = c(
+      120000, # Total
+      10000 + 22000, 15000 + 32000, 20000 + 21000, # Department only
+      20000 + 15000 + 10000, # 2016
+      10000, 15000, 20000, # 2016 each department
+      21000 + 32000 + 22000, # 2017
+      22000, 32000, 21000 # 2017 each department
+    ),
+    average_salary = c(
+      # Total
+      mean(c(20000, 15000, 10000, 21000, 32000, 22000)),
+      # Mean by department
+      mean(c(10000, 22000)), mean(c(15000, 32000)), mean(c(20000, 21000)),
+      mean(c(10000, 15000, 20000)), # 2016
+      10000, 15000, 20000, # 2016 each department
+      mean(c(21000, 32000, 22000)), # 2017
+      22000, 32000, 21000 # 2017 each department
+    ),
+    stringsAsFactors = FALSE
+  )
+
+  expect_equal(actual_cube, expected_cube)
+
+  # cube should accept column objects
+  expect_equal(
+    count(sum(cube(df, df$year, df$department), "salary")),
+    12
+  )
+
+  # cube without columns should result in a single aggregate
+  expect_equal(
+    collect(agg(cube(df), expr("sum(salary) as total_salary"))),
+    data.frame(total_salary = 120000)
+  )
+
+  actual_rollup <- collect(
+    orderBy(
+      agg(
+        rollup(df, "year", "department"),
+        expr("sum(salary) AS total_salary"), expr("avg(salary) AS 
average_salary")
+      ),
+      "year", "department"
+    )
+  )
+
+  expected_rollup <- data.frame(
+    year = c(NA, rep(2016, 4), rep(2017, 4)),
+    department = c(NA, rep(c(NA, "management", "rnd", "sales"), times = 2)),
+    total_salary = c(
+      120000, # Total
+      20000 + 15000 + 10000, # 2016
+      10000, 15000, 20000, # 2016 each department
+      21000 + 32000 + 22000, # 2017
+      22000, 32000, 21000 # 2017 each department
+    ),
+    average_salary = c(
+      # Total
+      mean(c(20000, 15000, 10000, 21000, 32000, 22000)),
+      mean(c(10000, 15000, 20000)), # 2016
+      10000, 15000, 20000, # 2016 each department
+      mean(c(21000, 32000, 22000)), # 2017
+      22000, 32000, 21000 # 2017 each department
+    ),
+    stringsAsFactors = FALSE
+  )
+
+  expect_equal(actual_rollup, expected_rollup)
+
+  # cube should accept column objects
+  expect_equal(
+    count(sum(rollup(df, df$year, df$department), "salary")),
+    9
+  )
+
+  # rollup without columns should result in a single aggregate
+  expect_equal(
+    collect(agg(rollup(df), expr("sum(salary) as total_salary"))),
+    data.frame(total_salary = 120000)
+  )
+})
+
 test_that("arrange() and orderBy() on a DataFrame", {
   df <- read.json(jsonPath)
   sorted <- arrange(df, df$age)

http://git-wip-us.apache.org/repos/asf/spark/blob/df58a95a/R/pkg/vignettes/sparkr-vignettes.Rmd
----------------------------------------------------------------------
diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd 
b/R/pkg/vignettes/sparkr-vignettes.Rmd
index f81dbab..4b9d6c3 100644
--- a/R/pkg/vignettes/sparkr-vignettes.Rmd
+++ b/R/pkg/vignettes/sparkr-vignettes.Rmd
@@ -308,6 +308,21 @@ numCyl <- summarize(groupBy(carsDF, carsDF$cyl), count = 
n(carsDF$cyl))
 head(numCyl)
 ```
 
+Use `cube` or `rollup` to compute subtotals across multiple dimensions.
+
+```{r}
+mean(cube(carsDF, "cyl", "gear", "am"), "mpg")
+```
+
+generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, 
while 
+
+```{r}
+mean(rollup(carsDF, "cyl", "gear", "am"), "mpg")
+```
+
+generates groupings for all possible combinations of grouping columns.
+
+
 #### Operating on Columns
 
 SparkR also provides a number of functions that can directly applied to 
columns for data processing and during aggregation. The example below shows the 
use of basic arithmetic functions.

http://git-wip-us.apache.org/repos/asf/spark/blob/df58a95a/docs/sparkr.md
----------------------------------------------------------------------
diff --git a/docs/sparkr.md b/docs/sparkr.md
index a1a35a7..e015ab2 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -264,6 +264,36 @@ head(arrange(waiting_counts, desc(waiting_counts$count)))
 {% endhighlight %}
 </div>
 
+In addition to standard aggregations, SparkR supports [OLAP 
cube](https://en.wikipedia.org/wiki/OLAP_cube) operators `cube`:
+
+<div data-lang="r"  markdown="1">
+{% highlight r %}
+head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
+##  cyl  disp gear avg(mpg)
+##1  NA 140.8    4     22.8
+##2   4  75.7    4     30.4
+##3   8 400.0    3     19.2
+##4   8 318.0    3     15.5
+##5  NA 351.0   NA     15.8
+##6  NA 275.8   NA     16.3
+{% endhighlight %}
+</div>
+
+and `rollup`:
+
+<div data-lang="r"  markdown="1">
+{% highlight r %}
+head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
+##  cyl  disp gear avg(mpg)
+##1   4  75.7    4     30.4
+##2   8 400.0    3     19.2
+##3   8 318.0    3     15.5
+##4   4  78.7   NA     32.4
+##5   8 304.0    3     15.2
+##6   4  79.0   NA     27.3
+{% endhighlight %}
+</div>
+
 ### Operating on Columns
 
 SparkR also provides a number of functions that can directly applied to 
columns for data processing and during aggregation. The example below shows the 
use of basic arithmetic functions.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to