Repository: spark
Updated Branches:
  refs/heads/master 4607f6e7f -> 769a909d1


[SPARK-7264][ML] Parallel lapply for sparkR

## What changes were proposed in this pull request?

This PR adds a new function in SparkR called `sparkLapply(list, function)`. 
This function implements a distributed version of `lapply` using Spark as a 
backend.

TODO:
 - [x] check documentation
 - [ ] check tests

Trivial example in SparkR:

```R
sparkLapply(1:5, function(x) { 2 * x })
```

Output:

```
[[1]]
[1] 2

[[2]]
[1] 4

[[3]]
[1] 6

[[4]]
[1] 8

[[5]]
[1] 10
```

Here is a slightly more complex example to perform distributed training of 
multiple models. Under the hood, Spark broadcasts the dataset.

```R
library("MASS")
data(menarche)
families <- c("gaussian", "poisson")
train <- function(family){glm(Menarche ~ Age  , family=family, data=menarche)}
results <- sparkLapply(families, train)
```

## How was this patch tested?

This PR was tested in SparkR. I am unfamiliar with R and SparkR, so any 
feedback on style, testing, etc. will be much appreciated.

cc falaki davies

Author: Timothy Hunter <timhun...@databricks.com>

Closes #12426 from thunterdb/7264.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/769a909d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/769a909d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/769a909d

Branch: refs/heads/master
Commit: 769a909d1357766a441ff69e6e98c22c51b12c93
Parents: 4607f6e
Author: Timothy Hunter <timhun...@databricks.com>
Authored: Thu Apr 28 22:42:48 2016 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Thu Apr 28 22:42:48 2016 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                          |  1 +
 R/pkg/R/context.R                        | 42 +++++++++++++++++++++++++++
 R/pkg/inst/tests/testthat/test_context.R |  6 ++++
 3 files changed, 49 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/769a909d/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index ea31bae..002e469 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -295,6 +295,7 @@ export("as.DataFrame",
        "read.json",
        "read.parquet",
        "read.text",
+       "spark.lapply",
        "sql",
        "str",
        "tableToDF",

http://git-wip-us.apache.org/repos/asf/spark/blob/769a909d/R/pkg/R/context.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 4105a6e..44bca87 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -226,6 +226,48 @@ setCheckpointDir <- function(sc, dirName) {
   invisible(callJMethod(sc, "setCheckpointDir", 
suppressWarnings(normalizePath(dirName))))
 }
 
+#' @title Run a function over a list of elements, distributing the 
computations with Spark.
+#'
+#' @description
+#' Applies a function in a manner that is similar to doParallel or lapply to 
elements of a list.
+#' The computations are distributed using Spark. It is conceptually the same 
as the following code:
+#'   lapply(list, func)
+#'
+#' Known limitations:
+#'  - variable scoping and capture: compared to R's rich support for variable 
resolutions, the
+# distributed nature of SparkR limits how variables are resolved at runtime. 
All the variables
+# that are available through lexical scoping are embedded in the closure of 
the function and
+# available as read-only variables within the function. The environment 
variables should be
+# stored into temporary variables outside the function, and not directly 
accessed within the
+# function.
+#'
+#'  - loading external packages: In order to use a package, you need to load 
it inside the
+#'    closure. For example, if you rely on the MASS module, here is how you 
would use it:
+#'\dontrun{
+#' train <- function(hyperparam) {
+#'   library(MASS)
+#'   lm.ridge(“y ~ x+z”, data, lambda=hyperparam)
+#'   model
+#' }
+#'}
+#'
+#' @rdname spark.lapply
+#' @param sc Spark Context to use
+#' @param list the list of elements
+#' @param func a function that takes one argument.
+#' @return a list of results (the exact type being determined by the function)
+#' @export
+#' @examples
+#'\dontrun{
+#' doubled <- spark.lapply(1:10, function(x){2 * x})
+#'}
+spark.lapply <- function(sc, list, func) {
+  rdd <- parallelize(sc, list, length(list))
+  results <- map(rdd, func)
+  local <- collect(results)
+  local
+}
+
 #' Set new log level
 #'
 #' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", 
"TRACE", "WARN"

http://git-wip-us.apache.org/repos/asf/spark/blob/769a909d/R/pkg/inst/tests/testthat/test_context.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_context.R 
b/R/pkg/inst/tests/testthat/test_context.R
index ffa067e..ca04342 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -141,3 +141,9 @@ test_that("sparkJars sparkPackages as comma-separated 
strings", {
   expect_that(processSparkJars(f), not(gives_warning()))
   expect_match(processSparkJars(f), f)
 })
+
+test_that("spark.lapply should perform simple transforms", {
+  sc <- sparkR.init()
+  doubled <- spark.lapply(sc, 1:10, function(x) { 2 * x })
+  expect_equal(doubled, as.list(2 * 1:10))
+})


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to