spark git commit: [SPARK-18335][SPARKR] createDataFrame to support numPartitions parameter
Repository: spark Updated Branches: refs/heads/branch-2.1 2c2ca8943 -> ee3642f51 [SPARK-18335][SPARKR] createDataFrame to support numPartitions parameter ## What changes were proposed in this pull request? To allow specifying number of partitions when the DataFrame is created ## How was this patch tested? manual, unit tests Author: Felix CheungCloses #16512 from felixcheung/rnumpart. (cherry picked from commit b0e8eb6d3e9e80fa62625a5b9382d93af77250db) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee3642f5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee3642f5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee3642f5 Branch: refs/heads/branch-2.1 Commit: ee3642f5182f199aac15b69d1a6a1167f75e5c65 Parents: 2c2ca89 Author: Felix Cheung Authored: Fri Jan 13 10:08:14 2017 -0800 Committer: Shivaram Venkataraman Committed: Fri Jan 13 10:08:25 2017 -0800 -- R/pkg/R/SQLContext.R | 20 + R/pkg/R/context.R | 39 ++ R/pkg/inst/tests/testthat/test_rdd.R | 4 +-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 23 ++- 4 files changed, 72 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ee3642f5/R/pkg/R/SQLContext.R -- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 6f48cd6..e771a05 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -184,8 +184,11 @@ getDefaultSqlSource <- function() { #' #' Converts R data.frame or list into SparkDataFrame. #' -#' @param data an RDD or list or data.frame. +#' @param data a list or data.frame. #' @param schema a list of column names or named list (StructType), optional. +#' @param samplingRatio Currently not used. +#' @param numPartitions the number of partitions of the SparkDataFrame. Defaults to 1, this is +#'limited by length of the list or number of rows of the data.frame #' @return A SparkDataFrame. #' @rdname createDataFrame #' @export @@ -195,12 +198,14 @@ getDefaultSqlSource <- function() { #' df1 <- as.DataFrame(iris) #' df2 <- as.DataFrame(list(3,4,5,6)) #' df3 <- createDataFrame(iris) +#' df4 <- createDataFrame(cars, numPartitions = 2) #' } #' @name createDataFrame #' @method createDataFrame default #' @note createDataFrame since 1.4.0 # TODO(davies): support sampling and infer type from NA -createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { +createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0, +numPartitions = NULL) { sparkSession <- getSparkSession() if (is.data.frame(data)) { @@ -233,7 +238,11 @@ createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { if (is.list(data)) { sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) -rdd <- parallelize(sc, data) +if (!is.null(numPartitions)) { + rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions)) +} else { + rdd <- parallelize(sc, data, numSlices = 1) +} } else if (inherits(data, "RDD")) { rdd <- data } else { @@ -283,14 +292,13 @@ createDataFrame <- function(x, ...) { dispatchFunc("createDataFrame(data, schema = NULL)", x, ...) } -#' @param samplingRatio Currently not used. #' @rdname createDataFrame #' @aliases createDataFrame #' @export #' @method as.DataFrame default #' @note as.DataFrame since 1.6.0 -as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { - createDataFrame(data, schema) +as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { + createDataFrame(data, schema, samplingRatio, numPartitions) } #' @param ... additional argument(s). http://git-wip-us.apache.org/repos/asf/spark/blob/ee3642f5/R/pkg/R/context.R -- diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 1138caf..1a0dd65 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -91,6 +91,16 @@ objectFile <- function(sc, path, minPartitions = NULL) { #' will write it to disk and send the file name to JVM. Also to make sure each slice is not #' larger than that limit, number of slices may be increased. #' +#' In 2.2.0 we are changing how the numSlices are used/computed to handle +#' 1 < (length(coll) / numSlices) << length(coll) better, and to get the exact number of slices. +#' This change affects both createDataFrame and spark.lapply. +#' In the
spark git commit: [SPARK-18335][SPARKR] createDataFrame to support numPartitions parameter
Repository: spark Updated Branches: refs/heads/master 285a7798e -> b0e8eb6d3 [SPARK-18335][SPARKR] createDataFrame to support numPartitions parameter ## What changes were proposed in this pull request? To allow specifying number of partitions when the DataFrame is created ## How was this patch tested? manual, unit tests Author: Felix CheungCloses #16512 from felixcheung/rnumpart. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0e8eb6d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0e8eb6d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0e8eb6d Branch: refs/heads/master Commit: b0e8eb6d3e9e80fa62625a5b9382d93af77250db Parents: 285a779 Author: Felix Cheung Authored: Fri Jan 13 10:08:14 2017 -0800 Committer: Shivaram Venkataraman Committed: Fri Jan 13 10:08:14 2017 -0800 -- R/pkg/R/SQLContext.R | 20 + R/pkg/R/context.R | 39 ++ R/pkg/inst/tests/testthat/test_rdd.R | 4 +-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 23 ++- 4 files changed, 72 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b0e8eb6d/R/pkg/R/SQLContext.R -- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 6f48cd6..e771a05 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -184,8 +184,11 @@ getDefaultSqlSource <- function() { #' #' Converts R data.frame or list into SparkDataFrame. #' -#' @param data an RDD or list or data.frame. +#' @param data a list or data.frame. #' @param schema a list of column names or named list (StructType), optional. +#' @param samplingRatio Currently not used. +#' @param numPartitions the number of partitions of the SparkDataFrame. Defaults to 1, this is +#'limited by length of the list or number of rows of the data.frame #' @return A SparkDataFrame. #' @rdname createDataFrame #' @export @@ -195,12 +198,14 @@ getDefaultSqlSource <- function() { #' df1 <- as.DataFrame(iris) #' df2 <- as.DataFrame(list(3,4,5,6)) #' df3 <- createDataFrame(iris) +#' df4 <- createDataFrame(cars, numPartitions = 2) #' } #' @name createDataFrame #' @method createDataFrame default #' @note createDataFrame since 1.4.0 # TODO(davies): support sampling and infer type from NA -createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { +createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0, +numPartitions = NULL) { sparkSession <- getSparkSession() if (is.data.frame(data)) { @@ -233,7 +238,11 @@ createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { if (is.list(data)) { sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) -rdd <- parallelize(sc, data) +if (!is.null(numPartitions)) { + rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions)) +} else { + rdd <- parallelize(sc, data, numSlices = 1) +} } else if (inherits(data, "RDD")) { rdd <- data } else { @@ -283,14 +292,13 @@ createDataFrame <- function(x, ...) { dispatchFunc("createDataFrame(data, schema = NULL)", x, ...) } -#' @param samplingRatio Currently not used. #' @rdname createDataFrame #' @aliases createDataFrame #' @export #' @method as.DataFrame default #' @note as.DataFrame since 1.6.0 -as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { - createDataFrame(data, schema) +as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { + createDataFrame(data, schema, samplingRatio, numPartitions) } #' @param ... additional argument(s). http://git-wip-us.apache.org/repos/asf/spark/blob/b0e8eb6d/R/pkg/R/context.R -- diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 1138caf..1a0dd65 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -91,6 +91,16 @@ objectFile <- function(sc, path, minPartitions = NULL) { #' will write it to disk and send the file name to JVM. Also to make sure each slice is not #' larger than that limit, number of slices may be increased. #' +#' In 2.2.0 we are changing how the numSlices are used/computed to handle +#' 1 < (length(coll) / numSlices) << length(coll) better, and to get the exact number of slices. +#' This change affects both createDataFrame and spark.lapply. +#' In the specific one case that it is used to convert R native object into SparkDataFrame, it has +#' always been kept at the default of 1. In the case the