Repository: spark Updated Branches: refs/heads/master ebb77b2af -> caf0136ec
[SPARK-6852] [SPARKR] Accept numeric as numPartitions in SparkR. Author: Sun Rui <rui....@intel.com> Closes #5613 from sun-rui/SPARK-6852 and squashes the following commits: abaf02e [Sun Rui] Change the type of default numPartitions from integer to numeric in generics.R. 29d67c1 [Sun Rui] [SPARK-6852][SPARKR] Accept numeric as numPartitions in SparkR. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/caf0136e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/caf0136e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/caf0136e Branch: refs/heads/master Commit: caf0136ec5838cf5bf61f39a5b3474a505a6ae11 Parents: ebb77b2 Author: Sun Rui <rui....@intel.com> Authored: Fri Apr 24 12:52:07 2015 -0700 Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu> Committed: Fri Apr 24 12:52:07 2015 -0700 ---------------------------------------------------------------------- R/pkg/R/RDD.R | 2 +- R/pkg/R/generics.R | 12 ++++++------ R/pkg/R/pairRDD.R | 24 ++++++++++++------------ 3 files changed, 19 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/caf0136e/R/pkg/R/RDD.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index cc09efb..1662d6b 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -967,7 +967,7 @@ setMethod("keyBy", setMethod("repartition", signature(x = "RDD", numPartitions = "numeric"), function(x, numPartitions) { - coalesce(x, numToInt(numPartitions), TRUE) + coalesce(x, numPartitions, TRUE) }) #' Return a new RDD that is reduced into numPartitions partitions. http://git-wip-us.apache.org/repos/asf/spark/blob/caf0136e/R/pkg/R/generics.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 6c62333..34dbe84 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -60,7 +60,7 @@ setGeneric("countByValue", function(x) { standardGeneric("countByValue") }) #' @rdname distinct #' @export -setGeneric("distinct", function(x, numPartitions = 1L) { standardGeneric("distinct") }) +setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") }) #' @rdname filterRDD #' @export @@ -182,7 +182,7 @@ setGeneric("setName", function(x, name) { standardGeneric("setName") }) #' @rdname sortBy #' @export setGeneric("sortBy", - function(x, func, ascending = TRUE, numPartitions = 1L) { + function(x, func, ascending = TRUE, numPartitions = 1) { standardGeneric("sortBy") }) @@ -244,7 +244,7 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") #' @rdname intersection #' @export -setGeneric("intersection", function(x, other, numPartitions = 1L) { +setGeneric("intersection", function(x, other, numPartitions = 1) { standardGeneric("intersection") }) #' @rdname keys @@ -346,21 +346,21 @@ setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("ri #' @rdname sortByKey #' @export setGeneric("sortByKey", - function(x, ascending = TRUE, numPartitions = 1L) { + function(x, ascending = TRUE, numPartitions = 1) { standardGeneric("sortByKey") }) #' @rdname subtract #' @export setGeneric("subtract", - function(x, other, numPartitions = 1L) { + function(x, other, numPartitions = 1) { standardGeneric("subtract") }) #' @rdname subtractByKey #' @export setGeneric("subtractByKey", - function(x, other, numPartitions = 1L) { + function(x, other, numPartitions = 1) { standardGeneric("subtractByKey") }) http://git-wip-us.apache.org/repos/asf/spark/blob/caf0136e/R/pkg/R/pairRDD.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R index f99b474..9791e55 100644 --- a/R/pkg/R/pairRDD.R +++ b/R/pkg/R/pairRDD.R @@ -190,7 +190,7 @@ setMethod("flatMapValues", #' @rdname partitionBy #' @aliases partitionBy,RDD,integer-method setMethod("partitionBy", - signature(x = "RDD", numPartitions = "integer"), + signature(x = "RDD", numPartitions = "numeric"), function(x, numPartitions, partitionFunc = hashCode) { #if (missing(partitionFunc)) { @@ -211,7 +211,7 @@ setMethod("partitionBy", # the content (key-val pairs). pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD", callJMethod(jrdd, "rdd"), - as.integer(numPartitions), + numToInt(numPartitions), serializedHashFuncBytes, getSerializedMode(x), packageNamesArr, @@ -221,7 +221,7 @@ setMethod("partitionBy", # Create a corresponding partitioner. rPartitioner <- newJObject("org.apache.spark.HashPartitioner", - as.integer(numPartitions)) + numToInt(numPartitions)) # Call partitionBy on the obtained PairwiseRDD. javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD") @@ -256,7 +256,7 @@ setMethod("partitionBy", #' @rdname groupByKey #' @aliases groupByKey,RDD,integer-method setMethod("groupByKey", - signature(x = "RDD", numPartitions = "integer"), + signature(x = "RDD", numPartitions = "numeric"), function(x, numPartitions) { shuffled <- partitionBy(x, numPartitions) groupVals <- function(part) { @@ -315,7 +315,7 @@ setMethod("groupByKey", #' @rdname reduceByKey #' @aliases reduceByKey,RDD,integer-method setMethod("reduceByKey", - signature(x = "RDD", combineFunc = "ANY", numPartitions = "integer"), + signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"), function(x, combineFunc, numPartitions) { reduceVals <- function(part) { vals <- new.env() @@ -422,7 +422,7 @@ setMethod("reduceByKeyLocally", #' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method setMethod("combineByKey", signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY", - mergeCombiners = "ANY", numPartitions = "integer"), + mergeCombiners = "ANY", numPartitions = "numeric"), function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) { combineLocally <- function(part) { combiners <- new.env() @@ -483,7 +483,7 @@ setMethod("combineByKey", #' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method setMethod("aggregateByKey", signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", - combOp = "ANY", numPartitions = "integer"), + combOp = "ANY", numPartitions = "numeric"), function(x, zeroValue, seqOp, combOp, numPartitions) { createCombiner <- function(v) { do.call(seqOp, list(zeroValue, v)) @@ -514,7 +514,7 @@ setMethod("aggregateByKey", #' @aliases foldByKey,RDD,ANY,ANY,integer-method setMethod("foldByKey", signature(x = "RDD", zeroValue = "ANY", - func = "ANY", numPartitions = "integer"), + func = "ANY", numPartitions = "numeric"), function(x, zeroValue, func, numPartitions) { aggregateByKey(x, zeroValue, func, func, numPartitions) }) @@ -553,7 +553,7 @@ setMethod("join", joinTaggedList(v, list(FALSE, FALSE)) } - joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)), + joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) }) @@ -582,7 +582,7 @@ setMethod("join", #' @rdname join-methods #' @aliases leftOuterJoin,RDD,RDD-method setMethod("leftOuterJoin", - signature(x = "RDD", y = "RDD", numPartitions = "integer"), + signature(x = "RDD", y = "RDD", numPartitions = "numeric"), function(x, y, numPartitions) { xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) @@ -619,7 +619,7 @@ setMethod("leftOuterJoin", #' @rdname join-methods #' @aliases rightOuterJoin,RDD,RDD-method setMethod("rightOuterJoin", - signature(x = "RDD", y = "RDD", numPartitions = "integer"), + signature(x = "RDD", y = "RDD", numPartitions = "numeric"), function(x, y, numPartitions) { xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) @@ -659,7 +659,7 @@ setMethod("rightOuterJoin", #' @rdname join-methods #' @aliases fullOuterJoin,RDD,RDD-method setMethod("fullOuterJoin", - signature(x = "RDD", y = "RDD", numPartitions = "integer"), + signature(x = "RDD", y = "RDD", numPartitions = "numeric"), function(x, y, numPartitions) { xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org