Repository: spark Updated Branches: refs/heads/master b45059d0d -> 0ba3fdd59
[Minor][SparkR] Minor refactor and removes redundancy related to cleanClosure. 1. Only use `cleanClosure` in creation of RRDDs. Normally, user and developer do not need to call `cleanClosure` in their function definition. 2. Removes redundant code (e.g. unnecessary wrapper functions) related to `cleanClosure`. Author: hlin09 <hlin0...@gmail.com> Closes #5495 from hlin09/cleanClosureFix and squashes the following commits: 74ec303 [hlin09] Minor refactor and removes redundancy. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ba3fdd5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ba3fdd5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ba3fdd5 Branch: refs/heads/master Commit: 0ba3fdd5992cf09bd38303ebff34d2ed19e5e09b Parents: b45059d Author: hlin09 <hlin0...@gmail.com> Authored: Mon Apr 13 20:43:24 2015 -0700 Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu> Committed: Mon Apr 13 20:43:24 2015 -0700 ---------------------------------------------------------------------- R/pkg/R/RDD.R | 16 ++++------------ R/pkg/R/pairRDD.R | 4 ---- 2 files changed, 4 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0ba3fdd5/R/pkg/R/RDD.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index d6a7500..820027e 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) { # This transformation is the first in its stage: - .Object@func <- func + .Object@func <- cleanClosure(func) .Object@prev_jrdd <- getJRDD(prev) .Object@env$prev_serializedMode <- prev@env$serializedMode # NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD @@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) pipelinedFunc <- function(split, iterator) { func(split, prev@func(split, iterator)) } - .Object@func <- pipelinedFunc + .Object@func <- cleanClosure(pipelinedFunc) .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline # Get the serialization mode of the parent RDD .Object@env$prev_serializedMode <- prev@env$prev_serializedMode @@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"), return(rdd@env$jrdd_val) } - computeFunc <- function(split, part) { - rdd@func(split, part) - } - packageNamesArr <- serialize(.sparkREnv[[".packages"]], connection = NULL) broadcastArr <- lapply(ls(.broadcastNames), function(name) { get(name, .broadcastNames) }) - serializedFuncArr <- serialize(computeFunc, connection = NULL) + serializedFuncArr <- serialize(rdd@func, connection = NULL) prev_jrdd <- rdd@prev_jrdd @@ -551,11 +547,7 @@ setMethod("mapPartitions", setMethod("lapplyPartitionsWithIndex", signature(X = "RDD", FUN = "function"), function(X, FUN) { - FUN <- cleanClosure(FUN) - closureCapturingFunc <- function(split, part) { - FUN(split, part) - } - PipelinedRDD(X, closureCapturingFunc) + PipelinedRDD(X, FUN) }) #' @rdname lapplyPartitionsWithIndex http://git-wip-us.apache.org/repos/asf/spark/blob/0ba3fdd5/R/pkg/R/pairRDD.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R index c2396c3..739d399 100644 --- a/R/pkg/R/pairRDD.R +++ b/R/pkg/R/pairRDD.R @@ -694,10 +694,6 @@ setMethod("cogroup", for (i in 1:rddsLen) { rdds[[i]] <- lapply(rdds[[i]], function(x) { list(x[[1]], list(i, x[[2]])) }) - # TODO(hao): As issue [SparkR-142] mentions, the right value of i - # will not be captured into UDF if getJRDD is not invoked. - # It should be resolved together with that issue. - getJRDD(rdds[[i]]) # Capture the closure. } union.rdd <- Reduce(unionRDD, rdds) group.func <- function(vlist) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org