Repository: spark
Updated Branches:
  refs/heads/master ef82bddc1 -> ca9f4ebb8


[SPARK-6991] [SPARKR] Adds support for zipPartitions.

Author: hlin09 <hlin0...@gmail.com>

Closes #5568 from hlin09/zipPartitions and squashes the following commits:

12c08a5 [hlin09] Fix comments
d2d32db [hlin09] Merge branch 'master' into zipPartitions
ec56d2f [hlin09] Fix test.
27655d3 [hlin09] Adds support for zipPartitions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca9f4ebb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca9f4ebb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca9f4ebb

Branch: refs/heads/master
Commit: ca9f4ebb8e510e521bf4df0331375ddb385fb9d2
Parents: ef82bdd
Author: hlin09 <hlin0...@gmail.com>
Authored: Mon Apr 27 15:04:37 2015 -0700
Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Committed: Mon Apr 27 15:04:37 2015 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                         |  1 +
 R/pkg/R/RDD.R                           | 46 ++++++++++++++++++++++++++++
 R/pkg/R/generics.R                      |  5 +++
 R/pkg/inst/tests/test_binary_function.R | 33 ++++++++++++++++++++
 4 files changed, 85 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca9f4ebb/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 8028364..e077eac 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -71,6 +71,7 @@ exportMethods(
               "unpersist",
               "value",
               "values",
+              "zipPartitions",
               "zipRDD",
               "zipWithIndex",
               "zipWithUniqueId"

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9f4ebb/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index f90c26b..a3a0421 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -1595,3 +1595,49 @@ setMethod("intersection",
 
             keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), 
filterFunction))
           })
+
+#' Zips an RDD's partitions with one (or more) RDD(s).
+#' Same as zipPartitions in Spark.
+#' 
+#' @param ... RDDs to be zipped.
+#' @param func A function to transform zipped partitions.
+#' @return A new RDD by applying a function to the zipped partitions. 
+#'         Assumes that all the RDDs have the *same number of partitions*, but 
+#'         does *not* require them to have the same number of elements in each 
partition.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, 1:2, 2L)  # 1, 2
+#' rdd2 <- parallelize(sc, 1:4, 2L)  # 1:2, 3:4
+#' rdd3 <- parallelize(sc, 1:6, 2L)  # 1:3, 4:6
+#' collect(zipPartitions(rdd1, rdd2, rdd3, 
+#'                       func = function(x, y, z) { list(list(x, y, z))} ))
+#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
+#'}
+#' @rdname zipRDD
+#' @aliases zipPartitions,RDD
+setMethod("zipPartitions",
+          "RDD",
+          function(..., func) {
+            rrdds <- list(...)
+            if (length(rrdds) == 1) {
+              return(rrdds[[1]])
+            }
+            nPart <- sapply(rrdds, numPartitions)
+            if (length(unique(nPart)) != 1) {
+              stop("Can only zipPartitions RDDs which have the same number of 
partitions.")
+            }
+            
+            rrdds <- lapply(rrdds, function(rdd) {
+              mapPartitionsWithIndex(rdd, function(partIndex, part) {
+                print(length(part))
+                list(list(partIndex, part))
+              })
+            })
+            union.rdd <- Reduce(unionRDD, rrdds)
+            zipped.rdd <- values(groupByKey(union.rdd, numPartitions = 
nPart[1]))
+            res <- mapPartitions(zipped.rdd, function(plist) {
+              do.call(func, plist[[1]])
+            })
+            res
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9f4ebb/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 34dbe84..e887293 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -217,6 +217,11 @@ setGeneric("unpersist", function(x, ...) { 
standardGeneric("unpersist") })
 #' @export
 setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })
 
+#' @rdname zipRDD
+#' @export
+setGeneric("zipPartitions", function(..., func) { 
standardGeneric("zipPartitions") }, 
+           signature = "...")
+
 #' @rdname zipWithIndex
 #' @seealso zipWithUniqueId
 #' @export

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9f4ebb/R/pkg/inst/tests/test_binary_function.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_binary_function.R 
b/R/pkg/inst/tests/test_binary_function.R
index c15553b..6785a7b 100644
--- a/R/pkg/inst/tests/test_binary_function.R
+++ b/R/pkg/inst/tests/test_binary_function.R
@@ -66,3 +66,36 @@ test_that("cogroup on two RDDs", {
   expect_equal(sortKeyValueList(actual),
                sortKeyValueList(expected))
 })
+
+test_that("zipPartitions() on RDDs", {
+  rdd1 <- parallelize(sc, 1:2, 2L)  # 1, 2
+  rdd2 <- parallelize(sc, 1:4, 2L)  # 1:2, 3:4
+  rdd3 <- parallelize(sc, 1:6, 2L)  # 1:3, 4:6
+  actual <- collect(zipPartitions(rdd1, rdd2, rdd3, 
+                                  func = function(x, y, z) { list(list(x, y, 
z))} ))
+  expect_equal(actual,
+               list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
+  
+  mockFile = c("Spark is pretty.", "Spark is awesome.")
+  fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+  writeLines(mockFile, fileName)
+  
+  rdd <- textFile(sc, fileName, 1)
+  actual <- collect(zipPartitions(rdd, rdd, 
+                                  func = function(x, y) { list(paste(x, y, sep 
= "\n")) }))
+  expected <- list(paste(mockFile, mockFile, sep = "\n"))
+  expect_equal(actual, expected)
+  
+  rdd1 <- parallelize(sc, 0:1, 1)
+  actual <- collect(zipPartitions(rdd1, rdd, 
+                                  func = function(x, y) { list(x + nchar(y)) 
}))
+  expected <- list(0:1 + nchar(mockFile))
+  expect_equal(actual, expected)
+  
+  rdd <- map(rdd, function(x) { x })
+  actual <- collect(zipPartitions(rdd, rdd1, 
+                                  func = function(x, y) { list(y + nchar(x)) 
}))
+  expect_equal(actual, expected)
+  
+  unlink(fileName)
+})


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to