spark git commit: [SPARK-18335][SPARKR] createDataFrame to support numPartitions parameter

2017-01-13 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 2c2ca8943 -> ee3642f51


[SPARK-18335][SPARKR] createDataFrame to support numPartitions parameter

## What changes were proposed in this pull request?

To allow specifying number of partitions when the DataFrame is created

## How was this patch tested?

manual, unit tests

Author: Felix Cheung 

Closes #16512 from felixcheung/rnumpart.

(cherry picked from commit b0e8eb6d3e9e80fa62625a5b9382d93af77250db)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee3642f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee3642f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee3642f5

Branch: refs/heads/branch-2.1
Commit: ee3642f5182f199aac15b69d1a6a1167f75e5c65
Parents: 2c2ca89
Author: Felix Cheung 
Authored: Fri Jan 13 10:08:14 2017 -0800
Committer: Shivaram Venkataraman 
Committed: Fri Jan 13 10:08:25 2017 -0800

--
 R/pkg/R/SQLContext.R  | 20 +
 R/pkg/R/context.R | 39 ++
 R/pkg/inst/tests/testthat/test_rdd.R  |  4 +--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 23 ++-
 4 files changed, 72 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ee3642f5/R/pkg/R/SQLContext.R
--
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 6f48cd6..e771a05 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -184,8 +184,11 @@ getDefaultSqlSource <- function() {
 #'
 #' Converts R data.frame or list into SparkDataFrame.
 #'
-#' @param data an RDD or list or data.frame.
+#' @param data a list or data.frame.
 #' @param schema a list of column names or named list (StructType), optional.
+#' @param samplingRatio Currently not used.
+#' @param numPartitions the number of partitions of the SparkDataFrame. 
Defaults to 1, this is
+#'limited by length of the list or number of rows of the data.frame
 #' @return A SparkDataFrame.
 #' @rdname createDataFrame
 #' @export
@@ -195,12 +198,14 @@ getDefaultSqlSource <- function() {
 #' df1 <- as.DataFrame(iris)
 #' df2 <- as.DataFrame(list(3,4,5,6))
 #' df3 <- createDataFrame(iris)
+#' df4 <- createDataFrame(cars, numPartitions = 2)
 #' }
 #' @name createDataFrame
 #' @method createDataFrame default
 #' @note createDataFrame since 1.4.0
 # TODO(davies): support sampling and infer type from NA
-createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) {
+createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0,
+numPartitions = NULL) {
   sparkSession <- getSparkSession()
 
   if (is.data.frame(data)) {
@@ -233,7 +238,11 @@ createDataFrame.default <- function(data, schema = NULL, 
samplingRatio = 1.0) {
 
   if (is.list(data)) {
 sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"getJavaSparkContext", sparkSession)
-rdd <- parallelize(sc, data)
+if (!is.null(numPartitions)) {
+  rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions))
+} else {
+  rdd <- parallelize(sc, data, numSlices = 1)
+}
   } else if (inherits(data, "RDD")) {
 rdd <- data
   } else {
@@ -283,14 +292,13 @@ createDataFrame <- function(x, ...) {
   dispatchFunc("createDataFrame(data, schema = NULL)", x, ...)
 }
 
-#' @param samplingRatio Currently not used.
 #' @rdname createDataFrame
 #' @aliases createDataFrame
 #' @export
 #' @method as.DataFrame default
 #' @note as.DataFrame since 1.6.0
-as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) {
-  createDataFrame(data, schema)
+as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0, 
numPartitions = NULL) {
+  createDataFrame(data, schema, samplingRatio, numPartitions)
 }
 
 #' @param ... additional argument(s).

http://git-wip-us.apache.org/repos/asf/spark/blob/ee3642f5/R/pkg/R/context.R
--
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 1138caf..1a0dd65 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -91,6 +91,16 @@ objectFile <- function(sc, path, minPartitions = NULL) {
 #' will write it to disk and send the file name to JVM. Also to make sure each 
slice is not
 #' larger than that limit, number of slices may be increased.
 #'
+#' In 2.2.0 we are changing how the numSlices are used/computed to handle
+#' 1 < (length(coll) / numSlices) << length(coll) better, and to get the exact 
number of slices.
+#' This change affects both createDataFrame and spark.lapply.
+#' In the 

spark git commit: [SPARK-18335][SPARKR] createDataFrame to support numPartitions parameter

2017-01-13 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 285a7798e -> b0e8eb6d3


[SPARK-18335][SPARKR] createDataFrame to support numPartitions parameter

## What changes were proposed in this pull request?

To allow specifying number of partitions when the DataFrame is created

## How was this patch tested?

manual, unit tests

Author: Felix Cheung 

Closes #16512 from felixcheung/rnumpart.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0e8eb6d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0e8eb6d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0e8eb6d

Branch: refs/heads/master
Commit: b0e8eb6d3e9e80fa62625a5b9382d93af77250db
Parents: 285a779
Author: Felix Cheung 
Authored: Fri Jan 13 10:08:14 2017 -0800
Committer: Shivaram Venkataraman 
Committed: Fri Jan 13 10:08:14 2017 -0800

--
 R/pkg/R/SQLContext.R  | 20 +
 R/pkg/R/context.R | 39 ++
 R/pkg/inst/tests/testthat/test_rdd.R  |  4 +--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 23 ++-
 4 files changed, 72 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b0e8eb6d/R/pkg/R/SQLContext.R
--
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 6f48cd6..e771a05 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -184,8 +184,11 @@ getDefaultSqlSource <- function() {
 #'
 #' Converts R data.frame or list into SparkDataFrame.
 #'
-#' @param data an RDD or list or data.frame.
+#' @param data a list or data.frame.
 #' @param schema a list of column names or named list (StructType), optional.
+#' @param samplingRatio Currently not used.
+#' @param numPartitions the number of partitions of the SparkDataFrame. 
Defaults to 1, this is
+#'limited by length of the list or number of rows of the data.frame
 #' @return A SparkDataFrame.
 #' @rdname createDataFrame
 #' @export
@@ -195,12 +198,14 @@ getDefaultSqlSource <- function() {
 #' df1 <- as.DataFrame(iris)
 #' df2 <- as.DataFrame(list(3,4,5,6))
 #' df3 <- createDataFrame(iris)
+#' df4 <- createDataFrame(cars, numPartitions = 2)
 #' }
 #' @name createDataFrame
 #' @method createDataFrame default
 #' @note createDataFrame since 1.4.0
 # TODO(davies): support sampling and infer type from NA
-createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) {
+createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0,
+numPartitions = NULL) {
   sparkSession <- getSparkSession()
 
   if (is.data.frame(data)) {
@@ -233,7 +238,11 @@ createDataFrame.default <- function(data, schema = NULL, 
samplingRatio = 1.0) {
 
   if (is.list(data)) {
 sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"getJavaSparkContext", sparkSession)
-rdd <- parallelize(sc, data)
+if (!is.null(numPartitions)) {
+  rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions))
+} else {
+  rdd <- parallelize(sc, data, numSlices = 1)
+}
   } else if (inherits(data, "RDD")) {
 rdd <- data
   } else {
@@ -283,14 +292,13 @@ createDataFrame <- function(x, ...) {
   dispatchFunc("createDataFrame(data, schema = NULL)", x, ...)
 }
 
-#' @param samplingRatio Currently not used.
 #' @rdname createDataFrame
 #' @aliases createDataFrame
 #' @export
 #' @method as.DataFrame default
 #' @note as.DataFrame since 1.6.0
-as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) {
-  createDataFrame(data, schema)
+as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0, 
numPartitions = NULL) {
+  createDataFrame(data, schema, samplingRatio, numPartitions)
 }
 
 #' @param ... additional argument(s).

http://git-wip-us.apache.org/repos/asf/spark/blob/b0e8eb6d/R/pkg/R/context.R
--
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 1138caf..1a0dd65 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -91,6 +91,16 @@ objectFile <- function(sc, path, minPartitions = NULL) {
 #' will write it to disk and send the file name to JVM. Also to make sure each 
slice is not
 #' larger than that limit, number of slices may be increased.
 #'
+#' In 2.2.0 we are changing how the numSlices are used/computed to handle
+#' 1 < (length(coll) / numSlices) << length(coll) better, and to get the exact 
number of slices.
+#' This change affects both createDataFrame and spark.lapply.
+#' In the specific one case that it is used to convert R native object into 
SparkDataFrame, it has
+#' always been kept at the default of 1. In the case the