spark git commit: [SPARK-18774][CORE][SQL] Ignore non-existing files when ignoreCorruptFiles is enabled
Repository: spark Updated Branches: refs/heads/master 330fda8aa -> b47b892e4 [SPARK-18774][CORE][SQL] Ignore non-existing files when ignoreCorruptFiles is enabled ## What changes were proposed in this pull request? When `ignoreCorruptFiles` is enabled, it's better to also ignore non-existing files. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16203 from zsxwing/ignore-file-not-found. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b47b892e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b47b892e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b47b892e Branch: refs/heads/master Commit: b47b892e4579b7b06b4b2837ee4b614e517789f9 Parents: 330fda8 Author: Shixiong Zhu Authored: Wed Dec 7 22:37:04 2016 -0800 Committer: Reynold Xin Committed: Wed Dec 7 22:37:04 2016 -0800 -- .../apache/spark/internal/config/package.scala | 3 ++- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 14 +++--- .../org/apache/spark/rdd/NewHadoopRDD.scala | 19 +++ .../sql/execution/datasources/FileScanRDD.scala | 3 +++ .../org/apache/spark/sql/internal/SQLConf.scala | 3 ++- 5 files changed, 33 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b47b892e/core/src/main/scala/org/apache/spark/internal/config/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index a69a2b5..78aed4f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -203,7 +203,8 @@ package object config { private[spark] val IGNORE_CORRUPT_FILES = ConfigBuilder("spark.files.ignoreCorruptFiles") .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " + - "encountering corrupt files and contents that have been read will still be returned.") + "encountering corrupted or non-existing files and contents that have been read will still " + + "be returned.") .booleanConf .createWithDefault(false) http://git-wip-us.apache.org/repos/asf/spark/blob/b47b892e/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 6e87233..a83e139 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -248,12 +248,20 @@ class HadoopRDD[K, V]( HadoopRDD.addLocalConfiguration( new SimpleDateFormat("MMddHHmmss", Locale.US).format(createTime), context.stageId, theSplit.index, context.attemptNumber, jobConf) - reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) + reader = +try { + inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) +} catch { + case e: IOException if ignoreCorruptFiles => +logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e) +finished = true +null +} // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener{ context => closeIfNeeded() } - private val key: K = reader.createKey() - private val value: V = reader.createValue() + private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey() + private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue() override def getNext(): (K, V) = { try { http://git-wip-us.apache.org/repos/asf/spark/blob/b47b892e/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index e805192..733e85f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -174,14 +174,25 @@ class NewHadoopRDD[K, V]( } private val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - private var reader = format.createRecordReader( -split.serializableHadoopSplit.value, hadoopAttemptContext) - reader.initialize(split.seria
spark git commit: Close stale pull requests.
Repository: spark Updated Branches: refs/heads/master 97255497d -> 330fda8aa Close stale pull requests. Closes #15689 Closes #14640 Closes #15917 Closes #16188 Closes #16206 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/330fda8a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/330fda8a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/330fda8a Branch: refs/heads/master Commit: 330fda8aa289e0142e174ed6f03b8fa28d08470f Parents: 9725549 Author: Reynold Xin Authored: Wed Dec 7 22:29:57 2016 -0800 Committer: Reynold Xin Committed: Wed Dec 7 22:29:57 2016 -0800 -- -- - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: Preparing Spark release v2.1.0-rc2
Repository: spark Updated Branches: refs/heads/branch-2.1 1c3f1da82 -> 48aa6775d Preparing Spark release v2.1.0-rc2 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08071749 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08071749 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08071749 Branch: refs/heads/branch-2.1 Commit: 080717497365b83bc202ab16812ced93eb1ea7bd Parents: 1c3f1da Author: Patrick Wendell Authored: Wed Dec 7 22:29:49 2016 -0800 Committer: Patrick Wendell Committed: Wed Dec 7 22:29:49 2016 -0800 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mesos/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 39 files changed, 40 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/08071749/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 46fb178..981ae12 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,7 +1,7 @@ Package: SparkR Type: Package Title: R Frontend for Apache Spark -Version: 2.1.1 +Version: 2.1.0 Date: 2016-11-06 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), email = "shiva...@cs.berkeley.edu"), http://git-wip-us.apache.org/repos/asf/spark/blob/08071749/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 29522fd..aebfd12 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.1.1-SNAPSHOT +2.1.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/08071749/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 85644c4..67d78d5 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.1-SNAPSHOT +2.1.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/08071749/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index e15ede9..9379097 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.1-SNAPSHOT +2.1.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/08071749/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index c93a355..53cb8dd 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark
[2/2] spark git commit: Preparing development version 2.1.1-SNAPSHOT
Preparing development version 2.1.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48aa6775 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48aa6775 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48aa6775 Branch: refs/heads/branch-2.1 Commit: 48aa6775d6b54ccecdbe2287ae75d99c00b02d18 Parents: 0807174 Author: Patrick Wendell Authored: Wed Dec 7 22:29:55 2016 -0800 Committer: Patrick Wendell Committed: Wed Dec 7 22:29:55 2016 -0800 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mesos/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 39 files changed, 40 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/48aa6775/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 981ae12..46fb178 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,7 +1,7 @@ Package: SparkR Type: Package Title: R Frontend for Apache Spark -Version: 2.1.0 +Version: 2.1.1 Date: 2016-11-06 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), email = "shiva...@cs.berkeley.edu"), http://git-wip-us.apache.org/repos/asf/spark/blob/48aa6775/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index aebfd12..29522fd 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.1.0 +2.1.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/48aa6775/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 67d78d5..85644c4 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.0 +2.1.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/48aa6775/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 9379097..e15ede9 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.0 +2.1.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/48aa6775/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 53cb8dd..c93a355 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.0 +2.1.1-SNAPSHOT ../../pom.xml
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.1.0-rc2 [created] 080717497 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18326][SPARKR][ML] Review SparkR ML wrappers API for 2.1
Repository: spark Updated Branches: refs/heads/master 82253617f -> 97255497d [SPARK-18326][SPARKR][ML] Review SparkR ML wrappers API for 2.1 ## What changes were proposed in this pull request? Reviewing SparkR ML wrappers API for 2.1 release, mainly two issues: * Remove ```probabilityCol``` from the argument list of ```spark.logit``` and ```spark.randomForest```. Since it was used when making prediction and should be an argument of ```predict```, and we will work on this at [SPARK-18618](https://issues.apache.org/jira/browse/SPARK-18618) in the next release cycle. * Fix ```spark.als``` params to make it consistent with MLlib. ## How was this patch tested? Existing tests. Author: Yanbo Liang Closes #16169 from yanboliang/spark-18326. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97255497 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97255497 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97255497 Branch: refs/heads/master Commit: 97255497d885f0f8ccfc808e868bc8aa5e4d1063 Parents: 8225361 Author: Yanbo Liang Authored: Wed Dec 7 20:23:28 2016 -0800 Committer: Yanbo Liang Committed: Wed Dec 7 20:23:28 2016 -0800 -- R/pkg/R/mllib.R | 23 +--- R/pkg/inst/tests/testthat/test_mllib.R | 4 ++-- .../spark/ml/r/LogisticRegressionWrapper.scala | 4 +--- .../r/RandomForestClassificationWrapper.scala | 2 -- 4 files changed, 13 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/97255497/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 074e9cb..632e4ad 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -733,7 +733,6 @@ setMethod("predict", signature(object = "KMeansModel"), #' excepting that at most one value may be 0. The class with largest value p/t is predicted, where p #' is the original probability of that class and t is the class's threshold. #' @param weightCol The weight column name. -#' @param probabilityCol column name for predicted class conditional probabilities. #' @param ... additional arguments passed to the method. #' @return \code{spark.logit} returns a fitted logistic regression model #' @rdname spark.logit @@ -772,7 +771,7 @@ setMethod("predict", signature(object = "KMeansModel"), setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, regParam = 0.0, elasticNetParam = 0.0, maxIter = 100, tol = 1E-6, family = "auto", standardization = TRUE, - thresholds = 0.5, weightCol = NULL, probabilityCol = "probability") { + thresholds = 0.5, weightCol = NULL) { formula <- paste(deparse(formula), collapse = "") if (is.null(weightCol)) { @@ -784,7 +783,7 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula") as.numeric(elasticNetParam), as.integer(maxIter), as.numeric(tol), as.character(family), as.logical(standardization), as.array(thresholds), -as.character(weightCol), as.character(probabilityCol)) +as.character(weightCol)) new("LogisticRegressionModel", jobj = jobj) }) @@ -1425,7 +1424,7 @@ setMethod("predict", signature(object = "GaussianMixtureModel"), #' @param userCol column name for user ids. Ids must be (or can be coerced into) integers. #' @param itemCol column name for item ids. Ids must be (or can be coerced into) integers. #' @param rank rank of the matrix factorization (> 0). -#' @param reg regularization parameter (>= 0). +#' @param regParam regularization parameter (>= 0). #' @param maxIter maximum number of iterations (>= 0). #' @param nonnegative logical value indicating whether to apply nonnegativity constraints. #' @param implicitPrefs logical value indicating whether to use implicit preference. @@ -1464,21 +1463,21 @@ setMethod("predict", signature(object = "GaussianMixtureModel"), #' #' # set other arguments #' modelS <- spark.als(df, "rating", "user", "item", rank = 20, -#' reg = 0.1, nonnegative = TRUE) +#' regParam = 0.1, nonnegative = TRUE) #' statsS <- summary(modelS) #' } #' @note spark.als since 2.1.0 setMethod("spark.als", signature(data = "SparkDataFrame"), function(data, ratingCol = "rating", userCol = "user", itemCol = "item", - rank = 10, reg = 0.1, maxIter = 10, nonnegative = FALSE, + rank = 10, regParam = 0.1, maxIter = 10, nonnegative =
spark git commit: [SPARK-18326][SPARKR][ML] Review SparkR ML wrappers API for 2.1
Repository: spark Updated Branches: refs/heads/branch-2.1 ab865cfd9 -> 1c3f1da82 [SPARK-18326][SPARKR][ML] Review SparkR ML wrappers API for 2.1 ## What changes were proposed in this pull request? Reviewing SparkR ML wrappers API for 2.1 release, mainly two issues: * Remove ```probabilityCol``` from the argument list of ```spark.logit``` and ```spark.randomForest```. Since it was used when making prediction and should be an argument of ```predict```, and we will work on this at [SPARK-18618](https://issues.apache.org/jira/browse/SPARK-18618) in the next release cycle. * Fix ```spark.als``` params to make it consistent with MLlib. ## How was this patch tested? Existing tests. Author: Yanbo Liang Closes #16169 from yanboliang/spark-18326. (cherry picked from commit 97255497d885f0f8ccfc808e868bc8aa5e4d1063) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c3f1da8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c3f1da8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c3f1da8 Branch: refs/heads/branch-2.1 Commit: 1c3f1da82356426b6b550fee67e66dc82eaf1c85 Parents: ab865cf Author: Yanbo Liang Authored: Wed Dec 7 20:23:28 2016 -0800 Committer: Yanbo Liang Committed: Wed Dec 7 20:23:45 2016 -0800 -- R/pkg/R/mllib.R | 23 +--- R/pkg/inst/tests/testthat/test_mllib.R | 4 ++-- .../spark/ml/r/LogisticRegressionWrapper.scala | 4 +--- .../r/RandomForestClassificationWrapper.scala | 2 -- 4 files changed, 13 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1c3f1da8/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 074e9cb..632e4ad 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -733,7 +733,6 @@ setMethod("predict", signature(object = "KMeansModel"), #' excepting that at most one value may be 0. The class with largest value p/t is predicted, where p #' is the original probability of that class and t is the class's threshold. #' @param weightCol The weight column name. -#' @param probabilityCol column name for predicted class conditional probabilities. #' @param ... additional arguments passed to the method. #' @return \code{spark.logit} returns a fitted logistic regression model #' @rdname spark.logit @@ -772,7 +771,7 @@ setMethod("predict", signature(object = "KMeansModel"), setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, regParam = 0.0, elasticNetParam = 0.0, maxIter = 100, tol = 1E-6, family = "auto", standardization = TRUE, - thresholds = 0.5, weightCol = NULL, probabilityCol = "probability") { + thresholds = 0.5, weightCol = NULL) { formula <- paste(deparse(formula), collapse = "") if (is.null(weightCol)) { @@ -784,7 +783,7 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula") as.numeric(elasticNetParam), as.integer(maxIter), as.numeric(tol), as.character(family), as.logical(standardization), as.array(thresholds), -as.character(weightCol), as.character(probabilityCol)) +as.character(weightCol)) new("LogisticRegressionModel", jobj = jobj) }) @@ -1425,7 +1424,7 @@ setMethod("predict", signature(object = "GaussianMixtureModel"), #' @param userCol column name for user ids. Ids must be (or can be coerced into) integers. #' @param itemCol column name for item ids. Ids must be (or can be coerced into) integers. #' @param rank rank of the matrix factorization (> 0). -#' @param reg regularization parameter (>= 0). +#' @param regParam regularization parameter (>= 0). #' @param maxIter maximum number of iterations (>= 0). #' @param nonnegative logical value indicating whether to apply nonnegativity constraints. #' @param implicitPrefs logical value indicating whether to use implicit preference. @@ -1464,21 +1463,21 @@ setMethod("predict", signature(object = "GaussianMixtureModel"), #' #' # set other arguments #' modelS <- spark.als(df, "rating", "user", "item", rank = 20, -#' reg = 0.1, nonnegative = TRUE) +#' regParam = 0.1, nonnegative = TRUE) #' statsS <- summary(modelS) #' } #' @note spark.als since 2.1.0 setMethod("spark.als", signature(data = "SparkDataFrame"), function(data, ratingCol = "rating", userCol = "user", itemCol = "item", - rank = 10, reg = 0.1, maxI
spark git commit: [SPARK-18705][ML][DOC] Update user guide to reflect one pass solver for L1 and elastic-net
Repository: spark Updated Branches: refs/heads/branch-2.1 617ce3ba7 -> ab865cfd9 [SPARK-18705][ML][DOC] Update user guide to reflect one pass solver for L1 and elastic-net ## What changes were proposed in this pull request? WeightedLeastSquares now supports L1 and elastic net penalties and has an additional solver option: QuasiNewton. The docs are updated to reflect this change. ## How was this patch tested? Docs only. Generated documentation to make sure Latex looks ok. Author: sethah Closes #16139 from sethah/SPARK-18705. (cherry picked from commit 82253617f5b3cdbd418c48f94e748651ee80077e) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab865cfd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab865cfd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab865cfd Branch: refs/heads/branch-2.1 Commit: ab865cfd9dc87154e7d4fc5d09168868c88db6b0 Parents: 617ce3b Author: sethah Authored: Wed Dec 7 19:41:32 2016 -0800 Committer: Yanbo Liang Committed: Wed Dec 7 19:42:06 2016 -0800 -- docs/ml-advanced.md | 24 1 file changed, 16 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ab865cfd/docs/ml-advanced.md -- diff --git a/docs/ml-advanced.md b/docs/ml-advanced.md index 12a03d3..2747f2d 100644 --- a/docs/ml-advanced.md +++ b/docs/ml-advanced.md @@ -59,17 +59,25 @@ Given $n$ weighted observations $(w_i, a_i, b_i)$: The number of features for each observation is $m$. We use the following weighted least squares formulation: `\[ -minimize_{x}\frac{1}{2} \sum_{i=1}^n \frac{w_i(a_i^T x -b_i)^2}{\sum_{k=1}^n w_k} + \frac{1}{2}\frac{\lambda}{\delta}\sum_{j=1}^m(\sigma_{j} x_{j})^2 +\min_{\mathbf{x}}\frac{1}{2} \sum_{i=1}^n \frac{w_i(\mathbf{a}_i^T \mathbf{x} -b_i)^2}{\sum_{k=1}^n w_k} + \frac{\lambda}{\delta}\left[\frac{1}{2}(1 - \alpha)\sum_{j=1}^m(\sigma_j x_j)^2 + \alpha\sum_{j=1}^m |\sigma_j x_j|\right] \]` -where $\lambda$ is the regularization parameter, $\delta$ is the population standard deviation of the label +where $\lambda$ is the regularization parameter, $\alpha$ is the elastic-net mixing parameter, $\delta$ is the population standard deviation of the label and $\sigma_j$ is the population standard deviation of the j-th feature column. -This objective function has an analytic solution and it requires only one pass over the data to collect necessary statistics to solve. -Unlike the original dataset which can only be stored in a distributed system, -these statistics can be loaded into memory on a single machine if the number of features is relatively small, and then we can solve the objective function through Cholesky factorization on the driver. +This objective function requires only one pass over the data to collect the statistics necessary to solve it. For an +$n \times m$ data matrix, these statistics require only $O(m^2)$ storage and so can be stored on a single machine when $m$ (the number of features) is +relatively small. We can then solve the normal equations on a single machine using local methods like direct Cholesky factorization or iterative optimization programs. -WeightedLeastSquares only supports L2 regularization and provides options to enable or disable regularization and standardization. -In order to make the normal equation approach efficient, WeightedLeastSquares requires that the number of features be no more than 4096. For larger problems, use L-BFGS instead. +Spark MLlib currently supports two types of solvers for the normal equations: Cholesky factorization and Quasi-Newton methods (L-BFGS/OWL-QN). Cholesky factorization +depends on a positive definite covariance matrix (i.e. columns of the data matrix must be linearly independent) and will fail if this condition is violated. Quasi-Newton methods +are still capable of providing a reasonable solution even when the covariance matrix is not positive definite, so the normal equation solver can also fall back to +Quasi-Newton methods in this case. This fallback is currently always enabled for the `LinearRegression` and `GeneralizedLinearRegression` estimators. + +`WeightedLeastSquares` supports L1, L2, and elastic-net regularization and provides options to enable or disable regularization and standardization. In the case where no +L1 regularization is applied (i.e. $\alpha = 0$), there exists an analytical solution and either Cholesky or Quasi-Newton solver may be used. When $\alpha > 0$ no analytical +solution exists and we instead use the Quasi-Newton solver to find the coefficients iteratively. + +In order to make the normal equation approach efficient, `WeightedLeastSquares` requires that the number of f
spark git commit: [SPARK-18705][ML][DOC] Update user guide to reflect one pass solver for L1 and elastic-net
Repository: spark Updated Branches: refs/heads/master 9ab725eab -> 82253617f [SPARK-18705][ML][DOC] Update user guide to reflect one pass solver for L1 and elastic-net ## What changes were proposed in this pull request? WeightedLeastSquares now supports L1 and elastic net penalties and has an additional solver option: QuasiNewton. The docs are updated to reflect this change. ## How was this patch tested? Docs only. Generated documentation to make sure Latex looks ok. Author: sethah Closes #16139 from sethah/SPARK-18705. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82253617 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82253617 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82253617 Branch: refs/heads/master Commit: 82253617f5b3cdbd418c48f94e748651ee80077e Parents: 9ab725e Author: sethah Authored: Wed Dec 7 19:41:32 2016 -0800 Committer: Yanbo Liang Committed: Wed Dec 7 19:41:32 2016 -0800 -- docs/ml-advanced.md | 24 1 file changed, 16 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/82253617/docs/ml-advanced.md -- diff --git a/docs/ml-advanced.md b/docs/ml-advanced.md index 12a03d3..2747f2d 100644 --- a/docs/ml-advanced.md +++ b/docs/ml-advanced.md @@ -59,17 +59,25 @@ Given $n$ weighted observations $(w_i, a_i, b_i)$: The number of features for each observation is $m$. We use the following weighted least squares formulation: `\[ -minimize_{x}\frac{1}{2} \sum_{i=1}^n \frac{w_i(a_i^T x -b_i)^2}{\sum_{k=1}^n w_k} + \frac{1}{2}\frac{\lambda}{\delta}\sum_{j=1}^m(\sigma_{j} x_{j})^2 +\min_{\mathbf{x}}\frac{1}{2} \sum_{i=1}^n \frac{w_i(\mathbf{a}_i^T \mathbf{x} -b_i)^2}{\sum_{k=1}^n w_k} + \frac{\lambda}{\delta}\left[\frac{1}{2}(1 - \alpha)\sum_{j=1}^m(\sigma_j x_j)^2 + \alpha\sum_{j=1}^m |\sigma_j x_j|\right] \]` -where $\lambda$ is the regularization parameter, $\delta$ is the population standard deviation of the label +where $\lambda$ is the regularization parameter, $\alpha$ is the elastic-net mixing parameter, $\delta$ is the population standard deviation of the label and $\sigma_j$ is the population standard deviation of the j-th feature column. -This objective function has an analytic solution and it requires only one pass over the data to collect necessary statistics to solve. -Unlike the original dataset which can only be stored in a distributed system, -these statistics can be loaded into memory on a single machine if the number of features is relatively small, and then we can solve the objective function through Cholesky factorization on the driver. +This objective function requires only one pass over the data to collect the statistics necessary to solve it. For an +$n \times m$ data matrix, these statistics require only $O(m^2)$ storage and so can be stored on a single machine when $m$ (the number of features) is +relatively small. We can then solve the normal equations on a single machine using local methods like direct Cholesky factorization or iterative optimization programs. -WeightedLeastSquares only supports L2 regularization and provides options to enable or disable regularization and standardization. -In order to make the normal equation approach efficient, WeightedLeastSquares requires that the number of features be no more than 4096. For larger problems, use L-BFGS instead. +Spark MLlib currently supports two types of solvers for the normal equations: Cholesky factorization and Quasi-Newton methods (L-BFGS/OWL-QN). Cholesky factorization +depends on a positive definite covariance matrix (i.e. columns of the data matrix must be linearly independent) and will fail if this condition is violated. Quasi-Newton methods +are still capable of providing a reasonable solution even when the covariance matrix is not positive definite, so the normal equation solver can also fall back to +Quasi-Newton methods in this case. This fallback is currently always enabled for the `LinearRegression` and `GeneralizedLinearRegression` estimators. + +`WeightedLeastSquares` supports L1, L2, and elastic-net regularization and provides options to enable or disable regularization and standardization. In the case where no +L1 regularization is applied (i.e. $\alpha = 0$), there exists an analytical solution and either Cholesky or Quasi-Newton solver may be used. When $\alpha > 0$ no analytical +solution exists and we instead use the Quasi-Newton solver to find the coefficients iteratively. + +In order to make the normal equation approach efficient, `WeightedLeastSquares` requires that the number of features be no more than 4096. For larger problems, use L-BFGS instead. ## Iteratively reweighted least
spark git commit: [SPARK-18758][SS] StreamingQueryListener events from a StreamingQuery should be sent only to the listeners in the same session as the query
Repository: spark Updated Branches: refs/heads/branch-2.1 839c2eb97 -> 617ce3ba7 [SPARK-18758][SS] StreamingQueryListener events from a StreamingQuery should be sent only to the listeners in the same session as the query ## What changes were proposed in this pull request? Listeners added with `sparkSession.streams.addListener(l)` are added to a SparkSession. So events only from queries in the same session as a listener should be posted to the listener. Currently, all the events gets rerouted through the Spark's main listener bus, that is, - StreamingQuery posts event to StreamingQueryListenerBus. Only the queries associated with the same session as the bus posts events to it. - StreamingQueryListenerBus posts event to Spark's main LiveListenerBus as a SparkEvent. - StreamingQueryListenerBus also subscribes to LiveListenerBus events thus getting back the posted event in a different thread. - The received is posted to the registered listeners. The problem is that *all StreamingQueryListenerBuses in all sessions* gets the events and posts them to their listeners. This is wrong. In this PR, I solve it by making StreamingQueryListenerBus track active queries (by their runIds) when a query posts the QueryStarted event to the bus. This allows the rerouted events to be filtered using the tracked queries. Note that this list needs to be maintained separately from the `StreamingQueryManager.activeQueries` because a terminated query is cleared from `StreamingQueryManager.activeQueries` as soon as it is stopped, but the this ListenerBus must clear a query only after the termination event of that query has been posted lazily, much after the query has been terminated. Credit goes to zsxwing for coming up with the initial idea. ## How was this patch tested? Updated test harness code to use the correct session, and added new unit test. Author: Tathagata Das Closes #16186 from tdas/SPARK-18758. (cherry picked from commit 9ab725eabbb4ad515a663b395bd2f91bb5853a23) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/617ce3ba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/617ce3ba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/617ce3ba Branch: refs/heads/branch-2.1 Commit: 617ce3ba765e13e354eaa9b7e13851aef40c9ceb Parents: 839c2eb Author: Tathagata Das Authored: Wed Dec 7 19:23:27 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 7 19:23:41 2016 -0800 -- .../streaming/StreamingQueryListenerBus.scala | 54 +-- .../spark/sql/execution/streaming/memory.scala | 4 +- .../apache/spark/sql/streaming/StreamTest.scala | 15 +++-- .../streaming/StreamingQueryListenerSuite.scala | 69 ++-- 4 files changed, 119 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/617ce3ba/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 22e4c63..a2153d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.execution.streaming +import java.util.UUID + +import scala.collection.mutable + import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent} import org.apache.spark.sql.streaming.StreamingQueryListener import org.apache.spark.util.ListenerBus @@ -25,7 +29,11 @@ import org.apache.spark.util.ListenerBus * A bus to forward events to [[StreamingQueryListener]]s. This one will send received * [[StreamingQueryListener.Event]]s to the Spark listener bus. It also registers itself with * Spark listener bus, so that it can receive [[StreamingQueryListener.Event]]s and dispatch them - * to StreamingQueryListener. + * to StreamingQueryListeners. + * + * Note that each bus and its registered listeners are associated with a single SparkSession + * and StreamingQueryManager. So this bus will dispatch events to registered listeners for only + * those queries that were started in the associated SparkSession. */ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] { @@ -35,12 +43,30 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) sparkListenerBus.addListener(this) /** - * Post a StreamingQueryList
spark git commit: [SPARK-18758][SS] StreamingQueryListener events from a StreamingQuery should be sent only to the listeners in the same session as the query
Repository: spark Updated Branches: refs/heads/master aad11209e -> 9ab725eab [SPARK-18758][SS] StreamingQueryListener events from a StreamingQuery should be sent only to the listeners in the same session as the query ## What changes were proposed in this pull request? Listeners added with `sparkSession.streams.addListener(l)` are added to a SparkSession. So events only from queries in the same session as a listener should be posted to the listener. Currently, all the events gets rerouted through the Spark's main listener bus, that is, - StreamingQuery posts event to StreamingQueryListenerBus. Only the queries associated with the same session as the bus posts events to it. - StreamingQueryListenerBus posts event to Spark's main LiveListenerBus as a SparkEvent. - StreamingQueryListenerBus also subscribes to LiveListenerBus events thus getting back the posted event in a different thread. - The received is posted to the registered listeners. The problem is that *all StreamingQueryListenerBuses in all sessions* gets the events and posts them to their listeners. This is wrong. In this PR, I solve it by making StreamingQueryListenerBus track active queries (by their runIds) when a query posts the QueryStarted event to the bus. This allows the rerouted events to be filtered using the tracked queries. Note that this list needs to be maintained separately from the `StreamingQueryManager.activeQueries` because a terminated query is cleared from `StreamingQueryManager.activeQueries` as soon as it is stopped, but the this ListenerBus must clear a query only after the termination event of that query has been posted lazily, much after the query has been terminated. Credit goes to zsxwing for coming up with the initial idea. ## How was this patch tested? Updated test harness code to use the correct session, and added new unit test. Author: Tathagata Das Closes #16186 from tdas/SPARK-18758. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ab725ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ab725ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ab725ea Branch: refs/heads/master Commit: 9ab725eabbb4ad515a663b395bd2f91bb5853a23 Parents: aad1120 Author: Tathagata Das Authored: Wed Dec 7 19:23:27 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 7 19:23:27 2016 -0800 -- .../streaming/StreamingQueryListenerBus.scala | 54 +-- .../spark/sql/execution/streaming/memory.scala | 4 +- .../apache/spark/sql/streaming/StreamTest.scala | 15 +++-- .../streaming/StreamingQueryListenerSuite.scala | 69 ++-- 4 files changed, 119 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9ab725ea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 22e4c63..a2153d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.execution.streaming +import java.util.UUID + +import scala.collection.mutable + import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent} import org.apache.spark.sql.streaming.StreamingQueryListener import org.apache.spark.util.ListenerBus @@ -25,7 +29,11 @@ import org.apache.spark.util.ListenerBus * A bus to forward events to [[StreamingQueryListener]]s. This one will send received * [[StreamingQueryListener.Event]]s to the Spark listener bus. It also registers itself with * Spark listener bus, so that it can receive [[StreamingQueryListener.Event]]s and dispatch them - * to StreamingQueryListener. + * to StreamingQueryListeners. + * + * Note that each bus and its registered listeners are associated with a single SparkSession + * and StreamingQueryManager. So this bus will dispatch events to registered listeners for only + * those queries that were started in the associated SparkSession. */ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] { @@ -35,12 +43,30 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) sparkListenerBus.addListener(this) /** - * Post a StreamingQueryListener event to the Spark listener bus asynchronously. This event will - * be dispatched to all StreamingQu
spark git commit: [SPARK-18633][ML][EXAMPLE] Add multiclass logistic regression summary python example and document
Repository: spark Updated Branches: refs/heads/branch-2.1 1c6419718 -> 839c2eb97 [SPARK-18633][ML][EXAMPLE] Add multiclass logistic regression summary python example and document ## What changes were proposed in this pull request? Logistic Regression summary is added in Python API. We need to add example and document for summary. The newly added example is consistent with Scala and Java examples. ## How was this patch tested? Manually tests: Run the example with spark-submit; copy & paste code into pyspark; build document and check the document. Author: wm...@hotmail.com Closes #16064 from wangmiao1981/py. (cherry picked from commit aad11209eb4db585f991ba09d08d90576f315bb4) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/839c2eb9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/839c2eb9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/839c2eb9 Branch: refs/heads/branch-2.1 Commit: 839c2eb9723ba51baf6022fea8c29caecf7c0612 Parents: 1c64197 Author: wm...@hotmail.com Authored: Wed Dec 7 18:12:49 2016 -0800 Committer: Joseph K. Bradley Committed: Wed Dec 7 18:13:04 2016 -0800 -- docs/ml-classification-regression.md| 10 ++- .../ml/logistic_regression_summary_example.py | 68 2 files changed, 76 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/839c2eb9/docs/ml-classification-regression.md -- diff --git a/docs/ml-classification-regression.md b/docs/ml-classification-regression.md index 5148ad0..557a53c 100644 --- a/docs/ml-classification-regression.md +++ b/docs/ml-classification-regression.md @@ -114,9 +114,15 @@ Continuing the earlier example: {% include_example java/org/apache/spark/examples/ml/JavaLogisticRegressionSummaryExample.java %} - -Logistic regression model summary is not yet supported in Python. +[`LogisticRegressionTrainingSummary`](api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegressionSummary) +provides a summary for a +[`LogisticRegressionModel`](api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegressionModel). +Currently, only binary classification is supported. Support for multiclass model summaries will be added in the future. + +Continuing the earlier example: + +{% include_example python/ml/logistic_regression_summary_example.py %} http://git-wip-us.apache.org/repos/asf/spark/blob/839c2eb9/examples/src/main/python/ml/logistic_regression_summary_example.py -- diff --git a/examples/src/main/python/ml/logistic_regression_summary_example.py b/examples/src/main/python/ml/logistic_regression_summary_example.py new file mode 100644 index 000..bd440a1 --- /dev/null +++ b/examples/src/main/python/ml/logistic_regression_summary_example.py @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +# $example on$ +from pyspark.ml.classification import LogisticRegression +# $example off$ +from pyspark.sql import SparkSession + +""" +An example demonstrating Logistic Regression Summary. +Run with: + bin/spark-submit examples/src/main/python/ml/logistic_regression_summary_example.py +""" + +if __name__ == "__main__": +spark = SparkSession \ +.builder \ +.appName("LogisticRegressionSummary") \ +.getOrCreate() + +# Load training data +training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + +lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) + +# Fit the model +lrModel = lr.fit(training) + +# $example on$ +# Extract the summary from the returned LogisticRegressionModel instance trained +# in the earlier example +trainingSummary = lrModel.summary + +# Obtain the objective per iteration +objectiveHistory = trainingSummary.objectiveHistory +print("objectiveHistory:") +for
spark git commit: [SPARK-18633][ML][EXAMPLE] Add multiclass logistic regression summary python example and document
Repository: spark Updated Branches: refs/heads/master bec0a9217 -> aad11209e [SPARK-18633][ML][EXAMPLE] Add multiclass logistic regression summary python example and document ## What changes were proposed in this pull request? Logistic Regression summary is added in Python API. We need to add example and document for summary. The newly added example is consistent with Scala and Java examples. ## How was this patch tested? Manually tests: Run the example with spark-submit; copy & paste code into pyspark; build document and check the document. Author: wm...@hotmail.com Closes #16064 from wangmiao1981/py. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aad11209 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aad11209 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aad11209 Branch: refs/heads/master Commit: aad11209eb4db585f991ba09d08d90576f315bb4 Parents: bec0a92 Author: wm...@hotmail.com Authored: Wed Dec 7 18:12:49 2016 -0800 Committer: Joseph K. Bradley Committed: Wed Dec 7 18:12:49 2016 -0800 -- docs/ml-classification-regression.md| 10 ++- .../ml/logistic_regression_summary_example.py | 68 2 files changed, 76 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aad11209/docs/ml-classification-regression.md -- diff --git a/docs/ml-classification-regression.md b/docs/ml-classification-regression.md index 5759593..bb9390f 100644 --- a/docs/ml-classification-regression.md +++ b/docs/ml-classification-regression.md @@ -114,9 +114,15 @@ Continuing the earlier example: {% include_example java/org/apache/spark/examples/ml/JavaLogisticRegressionSummaryExample.java %} - -Logistic regression model summary is not yet supported in Python. +[`LogisticRegressionTrainingSummary`](api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegressionSummary) +provides a summary for a +[`LogisticRegressionModel`](api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegressionModel). +Currently, only binary classification is supported. Support for multiclass model summaries will be added in the future. + +Continuing the earlier example: + +{% include_example python/ml/logistic_regression_summary_example.py %} http://git-wip-us.apache.org/repos/asf/spark/blob/aad11209/examples/src/main/python/ml/logistic_regression_summary_example.py -- diff --git a/examples/src/main/python/ml/logistic_regression_summary_example.py b/examples/src/main/python/ml/logistic_regression_summary_example.py new file mode 100644 index 000..bd440a1 --- /dev/null +++ b/examples/src/main/python/ml/logistic_regression_summary_example.py @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +# $example on$ +from pyspark.ml.classification import LogisticRegression +# $example off$ +from pyspark.sql import SparkSession + +""" +An example demonstrating Logistic Regression Summary. +Run with: + bin/spark-submit examples/src/main/python/ml/logistic_regression_summary_example.py +""" + +if __name__ == "__main__": +spark = SparkSession \ +.builder \ +.appName("LogisticRegressionSummary") \ +.getOrCreate() + +# Load training data +training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + +lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) + +# Fit the model +lrModel = lr.fit(training) + +# $example on$ +# Extract the summary from the returned LogisticRegressionModel instance trained +# in the earlier example +trainingSummary = lrModel.summary + +# Obtain the objective per iteration +objectiveHistory = trainingSummary.objectiveHistory +print("objectiveHistory:") +for objective in objectiveHistory: +print(objective) + +# Obtain the receiver-operating characteristic
spark git commit: [SPARK-18654][SQL] Remove unreachable patterns in makeRootConverter
Repository: spark Updated Branches: refs/heads/master 70b2bf717 -> bec0a9217 [SPARK-18654][SQL] Remove unreachable patterns in makeRootConverter ## What changes were proposed in this pull request? `makeRootConverter` is only called with a `StructType` value. By making this method less general we can remove pattern matches, which are never actually hit outside of the test suite. ## How was this patch tested? The existing tests. Author: Nathan Howell Closes #16084 from NathanHowell/SPARK-18654. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bec0a921 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bec0a921 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bec0a921 Branch: refs/heads/master Commit: bec0a9217b896a596c0a0919888da34589fc720d Parents: 70b2bf7 Author: Nathan Howell Authored: Wed Dec 7 16:52:05 2016 -0800 Committer: Reynold Xin Committed: Wed Dec 7 16:52:05 2016 -0800 -- .../spark/sql/catalyst/json/JacksonParser.scala | 56 .../execution/datasources/json/JsonSuite.scala | 2 +- 2 files changed, 22 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bec0a921/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index e476cb1..03e27ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -119,47 +119,33 @@ class JacksonParser( * to a value according to a desired schema. This is a wrapper for the method * `makeConverter()` to handle a row wrapped with an array. */ - def makeRootConverter(dataType: DataType): ValueConverter = dataType match { -case st: StructType => - val elementConverter = makeConverter(st) - val fieldConverters = st.map(_.dataType).map(makeConverter) - (parser: JsonParser) => parseJsonToken(parser, dataType) { -case START_OBJECT => convertObject(parser, st, fieldConverters) - // SPARK-3308: support reading top level JSON arrays and take every element - // in such an array as a row - // - // For example, we support, the JSON data as below: - // - // [{"a":"str_a_1"}] - // [{"a":"str_a_2"}, {"b":"str_b_3"}] - // - // resulting in: - // - // List([str_a_1,null]) - // List([str_a_2,null], [null,str_b_3]) - // -case START_ARRAY => convertArray(parser, elementConverter) - } - -case ArrayType(st: StructType, _) => - val elementConverter = makeConverter(st) - val fieldConverters = st.map(_.dataType).map(makeConverter) - (parser: JsonParser) => parseJsonToken(parser, dataType) { -// the business end of SPARK-3308: -// when an object is found but an array is requested just wrap it in a list. -// This is being wrapped in `JacksonParser.parse`. -case START_OBJECT => convertObject(parser, st, fieldConverters) -case START_ARRAY => convertArray(parser, elementConverter) - } - -case _ => makeConverter(dataType) + private def makeRootConverter(st: StructType): ValueConverter = { +val elementConverter = makeConverter(st) +val fieldConverters = st.map(_.dataType).map(makeConverter) +(parser: JsonParser) => parseJsonToken(parser, st) { + case START_OBJECT => convertObject(parser, st, fieldConverters) +// SPARK-3308: support reading top level JSON arrays and take every element +// in such an array as a row +// +// For example, we support, the JSON data as below: +// +// [{"a":"str_a_1"}] +// [{"a":"str_a_2"}, {"b":"str_b_3"}] +// +// resulting in: +// +// List([str_a_1,null]) +// List([str_a_2,null], [null,str_b_3]) +// + case START_ARRAY => convertArray(parser, elementConverter) +} } /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. */ - private def makeConverter(dataType: DataType): ValueConverter = dataType match { + private[sql] def makeConverter(dataType: DataType): ValueConverter = dataType match { case BooleanType => (parser: JsonParser) => parseJsonToken(parser, dataType) { case VALUE_TRUE => true http://git-wip-us.apache.org/repos/asf/spark/blob/bec0a921/sql/core/src/test/scala/org/apache/spark/sql/execu
spark git commit: [SPARK-18754][SS] Rename recentProgresses to recentProgress
Repository: spark Updated Branches: refs/heads/branch-2.1 e9b3afac9 -> 1c6419718 [SPARK-18754][SS] Rename recentProgresses to recentProgress Based on an informal survey, users find this option easier to understand / remember. Author: Michael Armbrust Closes #16182 from marmbrus/renameRecentProgress. (cherry picked from commit 70b2bf717d367d598c5a238d569d62c777e63fde) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c641971 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c641971 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c641971 Branch: refs/heads/branch-2.1 Commit: 1c6419718aadf0bdc200f9b328242062a07f2277 Parents: e9b3afa Author: Michael Armbrust Authored: Wed Dec 7 15:36:29 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 7 15:36:39 2016 -0800 -- .../spark/sql/kafka010/KafkaSourceSuite.scala | 2 +- project/MimaExcludes.scala| 2 +- python/pyspark/sql/streaming.py | 6 +++--- python/pyspark/sql/tests.py | 4 ++-- .../execution/streaming/ProgressReporter.scala| 2 +- .../org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/streaming/StreamingQuery.scala | 4 ++-- .../execution/streaming/ForeachSinkSuite.scala| 4 ++-- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../streaming/StreamingQueryListenerSuite.scala | 4 ++-- .../spark/sql/streaming/StreamingQuerySuite.scala | 18 +- 11 files changed, 25 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 0e40aba..544fbc5 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -448,7 +448,7 @@ class KafkaSourceSuite extends KafkaSourceTest { AddKafkaData(Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), AssertOnQuery { query => -val recordsRead = query.recentProgresses.map(_.numInputRows).sum +val recordsRead = query.recentProgress.map(_.numInputRows).sum recordsRead == 3 } ) http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/project/MimaExcludes.scala -- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 6650aad..978a328 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -85,7 +85,7 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgress"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"), http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/python/pyspark/sql/streaming.py -- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index ee7a26d..9cfb3fe 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -114,12 +114,12 @@ class StreamingQuery(object): @property @since(2.1) -def recentProgresses(self): +def recentProgress(self): """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. The number of progress updates retained for each stream is configured by Spark session -configuration `spark.sql.streaming.numRecentProgresses`. +configuration `spark.sql.streaming.numRecentProgressUpdates`. """ -return [json.loads(p.json()) for p in self._jsq.recentProgresses()] +return [json.loads(p.json()) for p in self._jsq.recentProgress()
spark git commit: [SPARK-18754][SS] Rename recentProgresses to recentProgress
Repository: spark Updated Branches: refs/heads/master edc87e189 -> 70b2bf717 [SPARK-18754][SS] Rename recentProgresses to recentProgress Based on an informal survey, users find this option easier to understand / remember. Author: Michael Armbrust Closes #16182 from marmbrus/renameRecentProgress. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70b2bf71 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70b2bf71 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70b2bf71 Branch: refs/heads/master Commit: 70b2bf717d367d598c5a238d569d62c777e63fde Parents: edc87e1 Author: Michael Armbrust Authored: Wed Dec 7 15:36:29 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 7 15:36:29 2016 -0800 -- .../spark/sql/kafka010/KafkaSourceSuite.scala | 2 +- project/MimaExcludes.scala| 2 +- python/pyspark/sql/streaming.py | 6 +++--- python/pyspark/sql/tests.py | 4 ++-- .../execution/streaming/ProgressReporter.scala| 2 +- .../org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/streaming/StreamingQuery.scala | 4 ++-- .../execution/streaming/ForeachSinkSuite.scala| 4 ++-- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../streaming/StreamingQueryListenerSuite.scala | 4 ++-- .../spark/sql/streaming/StreamingQuerySuite.scala | 18 +- 11 files changed, 25 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/70b2bf71/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 0e40aba..544fbc5 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -448,7 +448,7 @@ class KafkaSourceSuite extends KafkaSourceTest { AddKafkaData(Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), AssertOnQuery { query => -val recordsRead = query.recentProgresses.map(_.numInputRows).sum +val recordsRead = query.recentProgress.map(_.numInputRows).sum recordsRead == 3 } ) http://git-wip-us.apache.org/repos/asf/spark/blob/70b2bf71/project/MimaExcludes.scala -- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 82d50f9..b215d88 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -91,7 +91,7 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgress"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"), http://git-wip-us.apache.org/repos/asf/spark/blob/70b2bf71/python/pyspark/sql/streaming.py -- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index ee7a26d..9cfb3fe 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -114,12 +114,12 @@ class StreamingQuery(object): @property @since(2.1) -def recentProgresses(self): +def recentProgress(self): """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. The number of progress updates retained for each stream is configured by Spark session -configuration `spark.sql.streaming.numRecentProgresses`. +configuration `spark.sql.streaming.numRecentProgressUpdates`. """ -return [json.loads(p.json()) for p in self._jsq.recentProgresses()] +return [json.loads(p.json()) for p in self._jsq.recentProgress()] @property @since(2.1) http://git-wip-us.apache.org/repos/asf/spark/blob/70b2bf71/python/pyspa
spark git commit: [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite
Repository: spark Updated Branches: refs/heads/master bb94f61a7 -> edc87e189 [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite ## What changes were proposed in this pull request? Fixed the following failures: ``` org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.79085165 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout. ``` ``` sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146) Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null at java.util.ArrayList.addAll(ArrayList.java:577) at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257) at org.apache.kafka.clients.Metadata.update(Metadata.java:177) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at ... ``` ## How was this patch tested? Tested in #16048 by running many times. Author: Shixiong Zhu Closes #16109 from zsxwing/fix-kafka-flaky-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edc87e18 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edc87e18 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edc87e18 Branch: refs/heads/master Commit: edc87e18922b98be47c298cdc3daa2b049a737e9 Parents: bb94f61 Author: Shixiong Zhu Authored: Wed Dec 7 13:47:44 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 7 13:47:44 2016 -0800 -- .../sql/kafka010/CachedKafkaConsumer.scala | 39 -- .../apache/spark/sql/kafka010/KafkaSource.scala | 2 +- .../spark/sql/kafka010/KafkaSourceSuite.scala | 11 ++- .../spark/sql/kafka010/KafkaTestUtils.scala | 75 +--- .../spark/sql/test/SharedSQLContext.scala | 8 ++- 5 files changed, 96 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala index 3f438e9..3f396a7 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala @@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private( var toFetchOffset = offset while (toFetchOffset != UNKNOWN_OFFSET) { try { -return fetchData(toFetchOffset, pollTimeoutMs) +return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) } catch { case e: OffsetOutOfRangeException => // When there is some error thrown, it's better to use a new consumer to drop all cached @@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private( } /** - * Get the record at `offset`. + * Get the record for the
spark git commit: [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite
Repository: spark Updated Branches: refs/heads/branch-2.1 76e1f1651 -> e9b3afac9 [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite ## What changes were proposed in this pull request? Fixed the following failures: ``` org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.79085165 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout. ``` ``` sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146) Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null at java.util.ArrayList.addAll(ArrayList.java:577) at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257) at org.apache.kafka.clients.Metadata.update(Metadata.java:177) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at ... ``` ## How was this patch tested? Tested in #16048 by running many times. Author: Shixiong Zhu Closes #16109 from zsxwing/fix-kafka-flaky-test. (cherry picked from commit edc87e18922b98be47c298cdc3daa2b049a737e9) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9b3afac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9b3afac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9b3afac Branch: refs/heads/branch-2.1 Commit: e9b3afac9ce5ea4bffb8201a58856598c521a3a9 Parents: 76e1f16 Author: Shixiong Zhu Authored: Wed Dec 7 13:47:44 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 7 13:47:54 2016 -0800 -- .../sql/kafka010/CachedKafkaConsumer.scala | 39 -- .../apache/spark/sql/kafka010/KafkaSource.scala | 2 +- .../spark/sql/kafka010/KafkaSourceSuite.scala | 11 ++- .../spark/sql/kafka010/KafkaTestUtils.scala | 75 +--- .../spark/sql/test/SharedSQLContext.scala | 8 ++- 5 files changed, 96 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e9b3afac/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala index 3f438e9..3f396a7 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala @@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private( var toFetchOffset = offset while (toFetchOffset != UNKNOWN_OFFSET) { try { -return fetchData(toFetchOffset, pollTimeoutMs) +return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) } catch { case e: OffsetOutOfRangeException => // When there is some error thrown, it's better to use a new consumer to drop all cached @@ -159,14 +159,18 @@ private[kafka010] case clas
spark git commit: [SPARK-18762][WEBUI] Web UI should be http:4040 instead of https:4040
Repository: spark Updated Branches: refs/heads/branch-2.0 7fbb07372 -> 44df6d2ce [SPARK-18762][WEBUI] Web UI should be http:4040 instead of https:4040 ## What changes were proposed in this pull request? When SSL is enabled, the Spark shell shows: ``` Spark context Web UI available at https://192.168.99.1:4040 ``` This is wrong because 4040 is http, not https. It redirects to the https port. More importantly, this introduces several broken links in the UI. For example, in the master UI, the worker link is https:8081 instead of http:8081 or https:8481. CC: mengxr liancheng I manually tested accessing by accessing MasterPage, WorkerPage and HistoryServer with SSL enabled. Author: sarutak Closes #16190 from sarutak/SPARK-18761. (cherry picked from commit bb94f61a7ac97bf904ec0e8d5a4ab69a4142443f) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44df6d2c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44df6d2c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44df6d2c Branch: refs/heads/branch-2.0 Commit: 44df6d2ce9b9fee087efd18f9f72b2bed89b4223 Parents: 7fbb073 Author: sarutak Authored: Wed Dec 7 11:41:23 2016 -0800 Committer: Marcelo Vanzin Committed: Wed Dec 7 11:44:20 2016 -0800 -- core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 3 +-- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 5 + 2 files changed, 2 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/44df6d2c/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 724206b..5bd32b2 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -187,8 +187,7 @@ private[deploy] class Worker( webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() -val scheme = if (webUi.sslOptions.enabled) "https" else "http" -workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}" +workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"; registerWithMaster() metricsSystem.registerSource(workerSource) http://git-wip-us.apache.org/repos/asf/spark/blob/44df6d2c/core/src/main/scala/org/apache/spark/ui/WebUI.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index adc4a4f..2c40e72 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -146,10 +146,7 @@ private[spark] abstract class WebUI( } /** Return the url of web interface. Only valid after bind(). */ - def webUrl: String = { -val protocol = if (sslOptions.enabled) "https" else "http" -s"$protocol://$publicHostName:$boundPort" - } + def webUrl: String = s"http://$publicHostName:$boundPort"; /** Return the actual port to which this server is bound. Only valid after bind(). */ def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18762][WEBUI] Web UI should be http:4040 instead of https:4040
Repository: spark Updated Branches: refs/heads/branch-2.1 acb6ac5da -> 76e1f1651 [SPARK-18762][WEBUI] Web UI should be http:4040 instead of https:4040 ## What changes were proposed in this pull request? When SSL is enabled, the Spark shell shows: ``` Spark context Web UI available at https://192.168.99.1:4040 ``` This is wrong because 4040 is http, not https. It redirects to the https port. More importantly, this introduces several broken links in the UI. For example, in the master UI, the worker link is https:8081 instead of http:8081 or https:8481. CC: mengxr liancheng I manually tested accessing by accessing MasterPage, WorkerPage and HistoryServer with SSL enabled. Author: sarutak Closes #16190 from sarutak/SPARK-18761. (cherry picked from commit bb94f61a7ac97bf904ec0e8d5a4ab69a4142443f) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76e1f165 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76e1f165 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76e1f165 Branch: refs/heads/branch-2.1 Commit: 76e1f1651f5a7207c9c66686616709b62b798fa3 Parents: acb6ac5 Author: sarutak Authored: Wed Dec 7 11:41:23 2016 -0800 Committer: Marcelo Vanzin Committed: Wed Dec 7 11:41:46 2016 -0800 -- core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 3 +-- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 5 + 2 files changed, 2 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/76e1f165/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8b1c6bf..0940f3c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -187,8 +187,7 @@ private[deploy] class Worker( webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() -val scheme = if (webUi.sslOptions.enabled) "https" else "http" -workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}" +workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"; registerWithMaster() metricsSystem.registerSource(workerSource) http://git-wip-us.apache.org/repos/asf/spark/blob/76e1f165/core/src/main/scala/org/apache/spark/ui/WebUI.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index a05e0ef..4118fcf 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -147,10 +147,7 @@ private[spark] abstract class WebUI( } /** Return the url of web interface. Only valid after bind(). */ - def webUrl: String = { -val protocol = if (sslOptions.enabled) "https" else "http" -s"$protocol://$publicHostName:$boundPort" - } + def webUrl: String = s"http://$publicHostName:$boundPort"; /** Return the actual port to which this server is bound. Only valid after bind(). */ def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18762][WEBUI] Web UI should be http:4040 instead of https:4040
Repository: spark Updated Branches: refs/heads/master dbf3e298a -> bb94f61a7 [SPARK-18762][WEBUI] Web UI should be http:4040 instead of https:4040 ## What changes were proposed in this pull request? When SSL is enabled, the Spark shell shows: ``` Spark context Web UI available at https://192.168.99.1:4040 ``` This is wrong because 4040 is http, not https. It redirects to the https port. More importantly, this introduces several broken links in the UI. For example, in the master UI, the worker link is https:8081 instead of http:8081 or https:8481. CC: mengxr liancheng I manually tested accessing by accessing MasterPage, WorkerPage and HistoryServer with SSL enabled. Author: sarutak Closes #16190 from sarutak/SPARK-18761. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb94f61a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb94f61a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb94f61a Branch: refs/heads/master Commit: bb94f61a7ac97bf904ec0e8d5a4ab69a4142443f Parents: dbf3e29 Author: sarutak Authored: Wed Dec 7 11:41:23 2016 -0800 Committer: Marcelo Vanzin Committed: Wed Dec 7 11:41:23 2016 -0800 -- core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 3 +-- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 5 + 2 files changed, 2 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bb94f61a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8b1c6bf..0940f3c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -187,8 +187,7 @@ private[deploy] class Worker( webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() -val scheme = if (webUi.sslOptions.enabled) "https" else "http" -workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}" +workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"; registerWithMaster() metricsSystem.registerSource(workerSource) http://git-wip-us.apache.org/repos/asf/spark/blob/bb94f61a/core/src/main/scala/org/apache/spark/ui/WebUI.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 8c80155..b8604c5 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -147,10 +147,7 @@ private[spark] abstract class WebUI( } /** Return the url of web interface. Only valid after bind(). */ - def webUrl: String = { -val protocol = if (sslOptions.enabled) "https" else "http" -s"$protocol://$publicHostName:$boundPort" - } + def webUrl: String = s"http://$publicHostName:$boundPort"; /** Return the actual port to which this server is bound. Only valid after bind(). */ def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18764][CORE] Add a warning log when skipping a corrupted file
Repository: spark Updated Branches: refs/heads/branch-2.1 5dbcd4fcf -> acb6ac5da [SPARK-18764][CORE] Add a warning log when skipping a corrupted file ## What changes were proposed in this pull request? It's better to add a warning log when skipping a corrupted file. It will be helpful when we want to finish the job first, then find them in the log and fix these files. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16192 from zsxwing/SPARK-18764. (cherry picked from commit dbf3e298a1a35c0243f087814ddf88034ff96d66) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acb6ac5d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acb6ac5d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acb6ac5d Branch: refs/heads/branch-2.1 Commit: acb6ac5da7a5694cc3270772c6d68933b7d761dc Parents: 5dbcd4f Author: Shixiong Zhu Authored: Wed Dec 7 10:30:05 2016 -0800 Committer: Shixiong Zhu Committed: Wed Dec 7 10:30:15 2016 -0800 -- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 4 +++- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala| 6 +- .../apache/spark/sql/execution/datasources/FileScanRDD.scala | 1 + 3 files changed, 9 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/acb6ac5d/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index ae4320d..3133a28 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -257,7 +257,9 @@ class HadoopRDD[K, V]( try { finished = !reader.next(key, value) } catch { - case e: IOException if ignoreCorruptFiles => finished = true + case e: IOException if ignoreCorruptFiles => +logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e) +finished = true } if (!finished) { inputMetrics.incRecordsRead(1) http://git-wip-us.apache.org/repos/asf/spark/blob/acb6ac5d/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index c783e13..c6ddb4b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -186,7 +186,11 @@ class NewHadoopRDD[K, V]( try { finished = !reader.nextKeyValue } catch { -case e: IOException if ignoreCorruptFiles => finished = true +case e: IOException if ignoreCorruptFiles => + logWarning( +s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}", +e) + finished = true } if (finished) { // Close and release the reader here; close() will also be called when the task http://git-wip-us.apache.org/repos/asf/spark/blob/acb6ac5d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 8994457..237cdab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -138,6 +138,7 @@ class FileScanRDD( } } catch { case e: IOException => + logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e) finished = true null } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18764][CORE] Add a warning log when skipping a corrupted file
Repository: spark Updated Branches: refs/heads/master f1fca81b1 -> dbf3e298a [SPARK-18764][CORE] Add a warning log when skipping a corrupted file ## What changes were proposed in this pull request? It's better to add a warning log when skipping a corrupted file. It will be helpful when we want to finish the job first, then find them in the log and fix these files. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16192 from zsxwing/SPARK-18764. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbf3e298 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbf3e298 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbf3e298 Branch: refs/heads/master Commit: dbf3e298a1a35c0243f087814ddf88034ff96d66 Parents: f1fca81 Author: Shixiong Zhu Authored: Wed Dec 7 10:30:05 2016 -0800 Committer: Shixiong Zhu Committed: Wed Dec 7 10:30:05 2016 -0800 -- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 4 +++- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala| 6 +- .../apache/spark/sql/execution/datasources/FileScanRDD.scala | 1 + 3 files changed, 9 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dbf3e298/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index e3d81a6..6e87233 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -259,7 +259,9 @@ class HadoopRDD[K, V]( try { finished = !reader.next(key, value) } catch { - case e: IOException if ignoreCorruptFiles => finished = true + case e: IOException if ignoreCorruptFiles => +logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e) +finished = true } if (!finished) { inputMetrics.incRecordsRead(1) http://git-wip-us.apache.org/repos/asf/spark/blob/dbf3e298/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index e90e84c..e805192 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -189,7 +189,11 @@ class NewHadoopRDD[K, V]( try { finished = !reader.nextKeyValue } catch { -case e: IOException if ignoreCorruptFiles => finished = true +case e: IOException if ignoreCorruptFiles => + logWarning( +s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}", +e) + finished = true } if (finished) { // Close and release the reader here; close() will also be called when the task http://git-wip-us.apache.org/repos/asf/spark/blob/dbf3e298/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 306dc65..6d8cd81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -139,6 +139,7 @@ class FileScanRDD( } } catch { case e: IOException => + logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e) finished = true null } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17760][SQL][BACKPORT] AnalysisException with dataframe pivot when groupBy column is not attribute
Repository: spark Updated Branches: refs/heads/branch-2.0 e05ad8830 -> 7fbb07372 [SPARK-17760][SQL][BACKPORT] AnalysisException with dataframe pivot when groupBy column is not attribute ## What changes were proposed in this pull request? Backport of #16177 to branch-2.0 ## How was this patch tested? existing and additional unit tests Author: Andrew Ray Closes #16197 from aray/SPARK-17760-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fbb0737 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fbb0737 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fbb0737 Branch: refs/heads/branch-2.0 Commit: 7fbb073728e984ce11e8f64f324878a399078e14 Parents: e05ad88 Author: Andrew Ray Authored: Wed Dec 7 18:30:34 2016 +0100 Committer: Herman van Hovell Committed: Wed Dec 7 18:30:34 2016 +0100 -- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 5 +++-- .../scala/org/apache/spark/sql/DataFramePivotSuite.scala| 9 + 2 files changed, 12 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7fbb0737/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 05a2d18..32dc70a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -402,14 +402,15 @@ class Analyzer( .toAggregateExpression() , "__pivot_" + a.sql)() } - val secondAgg = Aggregate(groupByExprs, groupByExprs ++ pivotAggs, firstAgg) + val groupByExprsAttr = groupByExprs.map(_.toAttribute) + val secondAgg = Aggregate(groupByExprsAttr, groupByExprsAttr ++ pivotAggs, firstAgg) val pivotAggAttribute = pivotAggs.map(_.toAttribute) val pivotOutputs = pivotValues.zipWithIndex.flatMap { case (value, i) => aggregates.zip(pivotAggAttribute).map { case (aggregate, pivotAtt) => Alias(ExtractValue(pivotAtt, Literal(i), resolver), outputName(value, aggregate))() } } - Project(groupByExprs ++ pivotOutputs, secondAgg) + Project(groupByExprsAttr ++ pivotOutputs, secondAgg) } else { val pivotAggregates: Seq[NamedExpression] = pivotValues.flatMap { value => def ifExpr(expr: Expression) = { http://git-wip-us.apache.org/repos/asf/spark/blob/7fbb0737/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index d5cb5e1..41d3525 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -197,4 +197,13 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{ Row(2013, Seq(48000.0, 7.0), Seq(3.0, 7.0)) :: Nil ) } + + test("pivot with column definition in groupby") { +checkAnswer( + courseSales.groupBy(substring(col("course"), 0, 1).as("foo")) +.pivot("year", Seq(2012, 2013)) +.sum("earnings"), + Row("d", 15000.0, 48000.0) :: Row("J", 2.0, 3.0) :: Nil +) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17760][SQL] AnalysisException with dataframe pivot when groupBy column is not attribute
Repository: spark Updated Branches: refs/heads/branch-2.1 4432a2a83 -> 5dbcd4fcf [SPARK-17760][SQL] AnalysisException with dataframe pivot when groupBy column is not attribute ## What changes were proposed in this pull request? Fixes AnalysisException for pivot queries that have group by columns that are expressions and not attributes by substituting the expressions output attribute in the second aggregation and final projection. ## How was this patch tested? existing and additional unit tests Author: Andrew Ray Closes #16177 from aray/SPARK-17760. (cherry picked from commit f1fca81b165c5a673f7d86b268e04ea42a6c267e) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5dbcd4fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5dbcd4fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5dbcd4fc Branch: refs/heads/branch-2.1 Commit: 5dbcd4fcfbc14ba8c17e1cb364ca45b99aa90708 Parents: 4432a2a Author: Andrew Ray Authored: Wed Dec 7 04:44:14 2016 -0800 Committer: Herman van Hovell Committed: Wed Dec 7 04:44:25 2016 -0800 -- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala| 5 +++-- .../scala/org/apache/spark/sql/DataFramePivotSuite.scala | 8 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5dbcd4fc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f738ae8..9ca9901 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -404,14 +404,15 @@ class Analyzer( .toAggregateExpression() , "__pivot_" + a.sql)() } - val secondAgg = Aggregate(groupByExprs, groupByExprs ++ pivotAggs, firstAgg) + val groupByExprsAttr = groupByExprs.map(_.toAttribute) + val secondAgg = Aggregate(groupByExprsAttr, groupByExprsAttr ++ pivotAggs, firstAgg) val pivotAggAttribute = pivotAggs.map(_.toAttribute) val pivotOutputs = pivotValues.zipWithIndex.flatMap { case (value, i) => aggregates.zip(pivotAggAttribute).map { case (aggregate, pivotAtt) => Alias(ExtractValue(pivotAtt, Literal(i), resolver), outputName(value, aggregate))() } } - Project(groupByExprs ++ pivotOutputs, secondAgg) + Project(groupByExprsAttr ++ pivotOutputs, secondAgg) } else { val pivotAggregates: Seq[NamedExpression] = pivotValues.flatMap { value => def ifExpr(expr: Expression) = { http://git-wip-us.apache.org/repos/asf/spark/blob/5dbcd4fc/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index 1bbe135..a8d854c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -208,4 +208,12 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{ ) } + test("pivot with column definition in groupby") { +checkAnswer( + courseSales.groupBy(substring(col("course"), 0, 1).as("foo")) +.pivot("year", Seq(2012, 2013)) +.sum("earnings"), + Row("d", 15000.0, 48000.0) :: Row("J", 2.0, 3.0) :: Nil +) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17760][SQL] AnalysisException with dataframe pivot when groupBy column is not attribute
Repository: spark Updated Branches: refs/heads/master c496d03b5 -> f1fca81b1 [SPARK-17760][SQL] AnalysisException with dataframe pivot when groupBy column is not attribute ## What changes were proposed in this pull request? Fixes AnalysisException for pivot queries that have group by columns that are expressions and not attributes by substituting the expressions output attribute in the second aggregation and final projection. ## How was this patch tested? existing and additional unit tests Author: Andrew Ray Closes #16177 from aray/SPARK-17760. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1fca81b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1fca81b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1fca81b Branch: refs/heads/master Commit: f1fca81b165c5a673f7d86b268e04ea42a6c267e Parents: c496d03 Author: Andrew Ray Authored: Wed Dec 7 04:44:14 2016 -0800 Committer: Herman van Hovell Committed: Wed Dec 7 04:44:14 2016 -0800 -- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala| 5 +++-- .../scala/org/apache/spark/sql/DataFramePivotSuite.scala | 8 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f1fca81b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ed6e17a..58f98d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -463,14 +463,15 @@ class Analyzer( .toAggregateExpression() , "__pivot_" + a.sql)() } - val secondAgg = Aggregate(groupByExprs, groupByExprs ++ pivotAggs, firstAgg) + val groupByExprsAttr = groupByExprs.map(_.toAttribute) + val secondAgg = Aggregate(groupByExprsAttr, groupByExprsAttr ++ pivotAggs, firstAgg) val pivotAggAttribute = pivotAggs.map(_.toAttribute) val pivotOutputs = pivotValues.zipWithIndex.flatMap { case (value, i) => aggregates.zip(pivotAggAttribute).map { case (aggregate, pivotAtt) => Alias(ExtractValue(pivotAtt, Literal(i), resolver), outputName(value, aggregate))() } } - Project(groupByExprs ++ pivotOutputs, secondAgg) + Project(groupByExprsAttr ++ pivotOutputs, secondAgg) } else { val pivotAggregates: Seq[NamedExpression] = pivotValues.flatMap { value => def ifExpr(expr: Expression) = { http://git-wip-us.apache.org/repos/asf/spark/blob/f1fca81b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index 1bbe135..a8d854c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -208,4 +208,12 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{ ) } + test("pivot with column definition in groupby") { +checkAnswer( + courseSales.groupBy(substring(col("course"), 0, 1).as("foo")) +.pivot("year", Seq(2012, 2013)) +.sum("earnings"), + Row("d", 15000.0, 48000.0) :: Row("J", 2.0, 3.0) :: Nil +) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in BytesToBytesMap
Repository: spark Updated Branches: refs/heads/branch-2.0 f5c5a07bd -> e05ad8830 [SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in BytesToBytesMap ## What changes were proposed in this pull request? BytesToBytesMap currently does not release the in-memory storage (the longArray variable) after it spills to disk. This is typically not a problem during aggregation because the longArray should be much smaller than the pages, and because we grow the longArray at a conservative rate. However this can lead to an OOM when an already running task is allocated more than its fair share, this can happen because of a scheduling delay. In this case the longArray can grow beyond the fair share of memory for the task. This becomes problematic when the task spills and the long array is not freed, that causes subsequent memory allocation requests to be denied by the memory manager resulting in an OOM. This PR fixes this issuing by freeing the longArray when the BytesToBytesMap spills. ## How was this patch tested? Existing tests and tested on realworld workloads. Author: Jie Xiong Author: jiexiong Closes #15722 from jiexiong/jie_oom_fix. (cherry picked from commit c496d03b5289f7c604661a12af86f6accddcf125) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e05ad883 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e05ad883 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e05ad883 Branch: refs/heads/branch-2.0 Commit: e05ad8830e204acaf7cee4daef0ed44db9a158f3 Parents: f5c5a07 Author: Jie Xiong Authored: Wed Dec 7 04:33:30 2016 -0800 Committer: Herman van Hovell Committed: Wed Dec 7 04:34:04 2016 -0800 -- .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e05ad883/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java -- diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index dc04025..947db7f 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -169,6 +169,8 @@ public final class BytesToBytesMap extends MemoryConsumer { private long peakMemoryUsedBytes = 0L; + private final int initialCapacity; + private final BlockManager blockManager; private final SerializerManager serializerManager; private volatile MapIterator destructiveIterator = null; @@ -201,6 +203,7 @@ public final class BytesToBytesMap extends MemoryConsumer { throw new IllegalArgumentException("Page size " + pageSizeBytes + " cannot exceed " + TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES); } +this.initialCapacity = initialCapacity; allocate(initialCapacity); } @@ -897,12 +900,12 @@ public final class BytesToBytesMap extends MemoryConsumer { public void reset() { numKeys = 0; numValues = 0; -longArray.zeroOut(); - +freeArray(longArray); while (dataPages.size() > 0) { MemoryBlock dataPage = dataPages.removeLast(); freePage(dataPage); } +allocate(initialCapacity); currentPage = null; pageCursor = 0; } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in BytesToBytesMap
Repository: spark Updated Branches: refs/heads/branch-2.1 51754d6df -> 4432a2a83 [SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in BytesToBytesMap ## What changes were proposed in this pull request? BytesToBytesMap currently does not release the in-memory storage (the longArray variable) after it spills to disk. This is typically not a problem during aggregation because the longArray should be much smaller than the pages, and because we grow the longArray at a conservative rate. However this can lead to an OOM when an already running task is allocated more than its fair share, this can happen because of a scheduling delay. In this case the longArray can grow beyond the fair share of memory for the task. This becomes problematic when the task spills and the long array is not freed, that causes subsequent memory allocation requests to be denied by the memory manager resulting in an OOM. This PR fixes this issuing by freeing the longArray when the BytesToBytesMap spills. ## How was this patch tested? Existing tests and tested on realworld workloads. Author: Jie Xiong Author: jiexiong Closes #15722 from jiexiong/jie_oom_fix. (cherry picked from commit c496d03b5289f7c604661a12af86f6accddcf125) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4432a2a8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4432a2a8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4432a2a8 Branch: refs/heads/branch-2.1 Commit: 4432a2a8386f951775957f352e4ba223c6ce4fa3 Parents: 51754d6 Author: Jie Xiong Authored: Wed Dec 7 04:33:30 2016 -0800 Committer: Herman van Hovell Committed: Wed Dec 7 04:33:50 2016 -0800 -- .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4432a2a8/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java -- diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index d2fcdea..44120e5 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -170,6 +170,8 @@ public final class BytesToBytesMap extends MemoryConsumer { private long peakMemoryUsedBytes = 0L; + private final int initialCapacity; + private final BlockManager blockManager; private final SerializerManager serializerManager; private volatile MapIterator destructiveIterator = null; @@ -202,6 +204,7 @@ public final class BytesToBytesMap extends MemoryConsumer { throw new IllegalArgumentException("Page size " + pageSizeBytes + " cannot exceed " + TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES); } +this.initialCapacity = initialCapacity; allocate(initialCapacity); } @@ -902,12 +905,12 @@ public final class BytesToBytesMap extends MemoryConsumer { public void reset() { numKeys = 0; numValues = 0; -longArray.zeroOut(); - +freeArray(longArray); while (dataPages.size() > 0) { MemoryBlock dataPage = dataPages.removeLast(); freePage(dataPage); } +allocate(initialCapacity); currentPage = null; pageCursor = 0; } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in BytesToBytesMap
Repository: spark Updated Branches: refs/heads/master 79f5f281b -> c496d03b5 [SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in BytesToBytesMap ## What changes were proposed in this pull request? BytesToBytesMap currently does not release the in-memory storage (the longArray variable) after it spills to disk. This is typically not a problem during aggregation because the longArray should be much smaller than the pages, and because we grow the longArray at a conservative rate. However this can lead to an OOM when an already running task is allocated more than its fair share, this can happen because of a scheduling delay. In this case the longArray can grow beyond the fair share of memory for the task. This becomes problematic when the task spills and the long array is not freed, that causes subsequent memory allocation requests to be denied by the memory manager resulting in an OOM. This PR fixes this issuing by freeing the longArray when the BytesToBytesMap spills. ## How was this patch tested? Existing tests and tested on realworld workloads. Author: Jie Xiong Author: jiexiong Closes #15722 from jiexiong/jie_oom_fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c496d03b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c496d03b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c496d03b Branch: refs/heads/master Commit: c496d03b5289f7c604661a12af86f6accddcf125 Parents: 79f5f28 Author: Jie Xiong Authored: Wed Dec 7 04:33:30 2016 -0800 Committer: Herman van Hovell Committed: Wed Dec 7 04:33:30 2016 -0800 -- .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c496d03b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java -- diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index d2fcdea..44120e5 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -170,6 +170,8 @@ public final class BytesToBytesMap extends MemoryConsumer { private long peakMemoryUsedBytes = 0L; + private final int initialCapacity; + private final BlockManager blockManager; private final SerializerManager serializerManager; private volatile MapIterator destructiveIterator = null; @@ -202,6 +204,7 @@ public final class BytesToBytesMap extends MemoryConsumer { throw new IllegalArgumentException("Page size " + pageSizeBytes + " cannot exceed " + TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES); } +this.initialCapacity = initialCapacity; allocate(initialCapacity); } @@ -902,12 +905,12 @@ public final class BytesToBytesMap extends MemoryConsumer { public void reset() { numKeys = 0; numValues = 0; -longArray.zeroOut(); - +freeArray(longArray); while (dataPages.size() > 0) { MemoryBlock dataPage = dataPages.removeLast(); freePage(dataPage); } +allocate(initialCapacity); currentPage = null; pageCursor = 0; } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18678][ML] Skewed reservoir sampling in SamplingUtils
Repository: spark Updated Branches: refs/heads/branch-2.1 99c293eea -> 51754d6df [SPARK-18678][ML] Skewed reservoir sampling in SamplingUtils ## What changes were proposed in this pull request? Fix reservoir sampling bias for small k. An off-by-one error meant that the probability of replacement was slightly too high -- k/(l-1) after l element instead of k/l, which matters for small k. ## How was this patch tested? Existing test plus new test case. Author: Sean Owen Closes #16129 from srowen/SPARK-18678. (cherry picked from commit 79f5f281bb69cb2de9f64006180abd753e8ae427) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51754d6d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51754d6d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51754d6d Branch: refs/heads/branch-2.1 Commit: 51754d6df703c02ecb23ec1779889602ff8fb038 Parents: 99c293e Author: Sean Owen Authored: Wed Dec 7 17:34:45 2016 +0800 Committer: Sean Owen Committed: Wed Dec 7 17:34:57 2016 +0800 -- R/pkg/inst/tests/testthat/test_mllib.R | 9 + .../org/apache/spark/util/random/SamplingUtils.scala | 5 - .../apache/spark/util/random/SamplingUtilsSuite.scala | 13 + 3 files changed, 22 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51754d6d/R/pkg/inst/tests/testthat/test_mllib.R -- diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index d7aa965..9f810be 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -1007,10 +1007,11 @@ test_that("spark.randomForest", { model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16, numTrees = 20, seed = 123) predictions <- collect(predict(model, data)) - expect_equal(predictions$prediction, c(60.379, 61.096, 60.636, 62.258, - 63.736, 64.296, 64.868, 64.300, - 66.709, 67.697, 67.966, 67.252, - 68.866, 69.593, 69.195, 69.658), + expect_equal(predictions$prediction, c(60.32820, 61.22315, 60.69025, 62.11070, + 63.53160, 64.05470, 65.12710, 64.30450, + 66.70910, 67.86125, 68.08700, 67.21865, + 68.89275, 69.53180, 69.39640, 69.68250), + tolerance = 1e-4) stats <- summary(model) expect_equal(stats$numTrees, 20) http://git-wip-us.apache.org/repos/asf/spark/blob/51754d6d/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala index 297524c..a7e0075 100644 --- a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala @@ -56,11 +56,14 @@ private[spark] object SamplingUtils { val rand = new XORShiftRandom(seed) while (input.hasNext) { val item = input.next() +l += 1 +// There are k elements in the reservoir, and the l-th element has been +// consumed. It should be chosen with probability k/l. The expression +// below is a random long chosen uniformly from [0,l) val replacementIndex = (rand.nextDouble() * l).toLong if (replacementIndex < k) { reservoir(replacementIndex.toInt) = item } -l += 1 } (reservoir, l) } http://git-wip-us.apache.org/repos/asf/spark/blob/51754d6d/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala index 667a4db..55c5dd5 100644 --- a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala @@ -44,6 +44,19 @@ class SamplingUtilsSuite extends SparkFunSuite { assert(sample3.length === 10) } + test("SPARK-18678 reservoirSampleAndCount with tiny input") { +val input = Seq(0, 1) +val counts = new Array[Int](input.size) +for (i <- 0 until 500) { + val (samples, inputSize) = SamplingUtils.reservoirSampleAndCount(input.iterator, 1) + assert(inputSize === 2) +
spark git commit: [SPARK-18678][ML] Skewed reservoir sampling in SamplingUtils
Repository: spark Updated Branches: refs/heads/master b82802713 -> 79f5f281b [SPARK-18678][ML] Skewed reservoir sampling in SamplingUtils ## What changes were proposed in this pull request? Fix reservoir sampling bias for small k. An off-by-one error meant that the probability of replacement was slightly too high -- k/(l-1) after l element instead of k/l, which matters for small k. ## How was this patch tested? Existing test plus new test case. Author: Sean Owen Closes #16129 from srowen/SPARK-18678. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79f5f281 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79f5f281 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79f5f281 Branch: refs/heads/master Commit: 79f5f281bb69cb2de9f64006180abd753e8ae427 Parents: b828027 Author: Sean Owen Authored: Wed Dec 7 17:34:45 2016 +0800 Committer: Sean Owen Committed: Wed Dec 7 17:34:45 2016 +0800 -- R/pkg/inst/tests/testthat/test_mllib.R | 9 + .../org/apache/spark/util/random/SamplingUtils.scala | 5 - .../apache/spark/util/random/SamplingUtilsSuite.scala | 13 + 3 files changed, 22 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/79f5f281/R/pkg/inst/tests/testthat/test_mllib.R -- diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 0802a2a..4758e40 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -1007,10 +1007,11 @@ test_that("spark.randomForest", { model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16, numTrees = 20, seed = 123) predictions <- collect(predict(model, data)) - expect_equal(predictions$prediction, c(60.379, 61.096, 60.636, 62.258, - 63.736, 64.296, 64.868, 64.300, - 66.709, 67.697, 67.966, 67.252, - 68.866, 69.593, 69.195, 69.658), + expect_equal(predictions$prediction, c(60.32820, 61.22315, 60.69025, 62.11070, + 63.53160, 64.05470, 65.12710, 64.30450, + 66.70910, 67.86125, 68.08700, 67.21865, + 68.89275, 69.53180, 69.39640, 69.68250), + tolerance = 1e-4) stats <- summary(model) expect_equal(stats$numTrees, 20) http://git-wip-us.apache.org/repos/asf/spark/blob/79f5f281/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala index 297524c..a7e0075 100644 --- a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala @@ -56,11 +56,14 @@ private[spark] object SamplingUtils { val rand = new XORShiftRandom(seed) while (input.hasNext) { val item = input.next() +l += 1 +// There are k elements in the reservoir, and the l-th element has been +// consumed. It should be chosen with probability k/l. The expression +// below is a random long chosen uniformly from [0,l) val replacementIndex = (rand.nextDouble() * l).toLong if (replacementIndex < k) { reservoir(replacementIndex.toInt) = item } -l += 1 } (reservoir, l) } http://git-wip-us.apache.org/repos/asf/spark/blob/79f5f281/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala index 667a4db..55c5dd5 100644 --- a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala @@ -44,6 +44,19 @@ class SamplingUtilsSuite extends SparkFunSuite { assert(sample3.length === 10) } + test("SPARK-18678 reservoirSampleAndCount with tiny input") { +val input = Seq(0, 1) +val counts = new Array[Int](input.size) +for (i <- 0 until 500) { + val (samples, inputSize) = SamplingUtils.reservoirSampleAndCount(input.iterator, 1) + assert(inputSize === 2) + assert(samples.length === 1) + counts(samples.head) += 1 +} +// If correct, should be true
spark git commit: [SPARK-18701][ML] Fix Poisson GLM failure due to wrong initialization
Repository: spark Updated Branches: refs/heads/master 90b59d1bf -> b82802713 [SPARK-18701][ML] Fix Poisson GLM failure due to wrong initialization Poisson GLM fails for many standard data sets (see example in test or JIRA). The issue is incorrect initialization leading to almost zero probability and weights. Specifically, the mean is initialized as the response, which could be zero. Applying the log link results in very negative numbers (protected against -Inf), which again leads to close to zero probability and weights in the weighted least squares. Fix and test are included in the commits. ## What changes were proposed in this pull request? Update initialization in Poisson GLM ## How was this patch tested? Add test in GeneralizedLinearRegressionSuite srowen sethah yanboliang HyukjinKwon mengxr Author: actuaryzhang Closes #16131 from actuaryzhang/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8280271 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8280271 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8280271 Branch: refs/heads/master Commit: b8280271396eb74638da6546d76bbb2d06c7011b Parents: 90b59d1 Author: actuaryzhang Authored: Wed Dec 7 16:37:25 2016 +0800 Committer: Sean Owen Committed: Wed Dec 7 16:37:25 2016 +0800 -- .../GeneralizedLinearRegression.scala | 6 +- .../GeneralizedLinearRegressionSuite.scala | 21 +++- 2 files changed, 17 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b8280271/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 770a257..f137c8c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -505,7 +505,11 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine override def initialize(y: Double, weight: Double): Double = { require(y >= 0.0, "The response variable of Poisson family " + s"should be non-negative, but got $y") - y + /* +Force Poisson mean > 0 to avoid numerical instability in IRLS. +R uses y + 0.1 for initialization. See poisson()$initialize. + */ + math.max(y, 0.1) } override def variance(mu: Double): Double = mu http://git-wip-us.apache.org/repos/asf/spark/blob/b8280271/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 4fab216..3e9e1fc 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -89,11 +89,14 @@ class GeneralizedLinearRegressionSuite xVariance = Array(0.7, 1.2), nPoints = 1, seed, noiseLevel = 0.01, family = "poisson", link = "log").toDF() -datasetPoissonLogWithZero = generateGeneralizedLinearRegressionInput( - intercept = -1.5, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 100, seed, noiseLevel = 0.01, - family = "poisson", link = "log") - .map{x => LabeledPoint(if (x.label < 0.7) 0.0 else x.label, x.features)}.toDF() +datasetPoissonLogWithZero = Seq( + LabeledPoint(0.0, Vectors.dense(18, 1.0)), + LabeledPoint(1.0, Vectors.dense(12, 0.0)), + LabeledPoint(0.0, Vectors.dense(15, 0.0)), + LabeledPoint(0.0, Vectors.dense(13, 2.0)), + LabeledPoint(0.0, Vectors.dense(15, 1.0)), + LabeledPoint(1.0, Vectors.dense(16, 1.0)) +).toDF() datasetPoissonIdentity = generateGeneralizedLinearRegressionInput( intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), @@ -480,12 +483,12 @@ class GeneralizedLinearRegressionSuite model <- glm(formula, family="poisson", data=data) print(as.vector(coef(model))) } - [1] 0.4272661 -0.1565423 - [1] -3.6911354 0.6214301 0.1295814 + [1] -0.0457441 -0.6833928 + [1] 1.8121235 -0.1747493 -0.5815417 */ val expected = Seq( - Vectors.dense(0.0, 0.4272661, -0.1565423), - Vecto
spark git commit: [SPARK-18701][ML] Fix Poisson GLM failure due to wrong initialization
Repository: spark Updated Branches: refs/heads/branch-2.1 340e9aea4 -> 99c293eea [SPARK-18701][ML] Fix Poisson GLM failure due to wrong initialization Poisson GLM fails for many standard data sets (see example in test or JIRA). The issue is incorrect initialization leading to almost zero probability and weights. Specifically, the mean is initialized as the response, which could be zero. Applying the log link results in very negative numbers (protected against -Inf), which again leads to close to zero probability and weights in the weighted least squares. Fix and test are included in the commits. ## What changes were proposed in this pull request? Update initialization in Poisson GLM ## How was this patch tested? Add test in GeneralizedLinearRegressionSuite srowen sethah yanboliang HyukjinKwon mengxr Author: actuaryzhang Closes #16131 from actuaryzhang/master. (cherry picked from commit b8280271396eb74638da6546d76bbb2d06c7011b) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99c293ee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99c293ee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99c293ee Branch: refs/heads/branch-2.1 Commit: 99c293eeaa9733fc424404d04a9671e9525a1e36 Parents: 340e9ae Author: actuaryzhang Authored: Wed Dec 7 16:37:25 2016 +0800 Committer: Sean Owen Committed: Wed Dec 7 16:37:37 2016 +0800 -- .../GeneralizedLinearRegression.scala | 6 +- .../GeneralizedLinearRegressionSuite.scala | 21 +++- 2 files changed, 17 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/99c293ee/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 770a257..f137c8c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -505,7 +505,11 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine override def initialize(y: Double, weight: Double): Double = { require(y >= 0.0, "The response variable of Poisson family " + s"should be non-negative, but got $y") - y + /* +Force Poisson mean > 0 to avoid numerical instability in IRLS. +R uses y + 0.1 for initialization. See poisson()$initialize. + */ + math.max(y, 0.1) } override def variance(mu: Double): Double = mu http://git-wip-us.apache.org/repos/asf/spark/blob/99c293ee/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 4fab216..3e9e1fc 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -89,11 +89,14 @@ class GeneralizedLinearRegressionSuite xVariance = Array(0.7, 1.2), nPoints = 1, seed, noiseLevel = 0.01, family = "poisson", link = "log").toDF() -datasetPoissonLogWithZero = generateGeneralizedLinearRegressionInput( - intercept = -1.5, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 100, seed, noiseLevel = 0.01, - family = "poisson", link = "log") - .map{x => LabeledPoint(if (x.label < 0.7) 0.0 else x.label, x.features)}.toDF() +datasetPoissonLogWithZero = Seq( + LabeledPoint(0.0, Vectors.dense(18, 1.0)), + LabeledPoint(1.0, Vectors.dense(12, 0.0)), + LabeledPoint(0.0, Vectors.dense(15, 0.0)), + LabeledPoint(0.0, Vectors.dense(13, 2.0)), + LabeledPoint(0.0, Vectors.dense(15, 1.0)), + LabeledPoint(1.0, Vectors.dense(16, 1.0)) +).toDF() datasetPoissonIdentity = generateGeneralizedLinearRegressionInput( intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), @@ -480,12 +483,12 @@ class GeneralizedLinearRegressionSuite model <- glm(formula, family="poisson", data=data) print(as.vector(coef(model))) } - [1] 0.4272661 -0.1565423 - [1] -3.6911354 0.6214301 0.1295814 + [1] -0.0457441 -0.6833928 + [1] 1.8121235 -0.1747493 -0.
spark git commit: [SPARK-18686][SPARKR][ML] Several cleanup and improvements for spark.logit.
Repository: spark Updated Branches: refs/heads/branch-2.1 3750c6e9b -> 340e9aea4 [SPARK-18686][SPARKR][ML] Several cleanup and improvements for spark.logit. ## What changes were proposed in this pull request? Several cleanup and improvements for ```spark.logit```: * ```summary``` should return coefficients matrix, and should output labels for each class if the model is multinomial logistic regression model. * ```summary``` should not return ```areaUnderROC, roc, pr, ...```, since most of them are DataFrame which are less important for R users. Meanwhile, these metrics ignore instance weights (setting all to 1.0) which will be changed in later Spark version. In case it will introduce breaking changes, we do not expose them currently. * SparkR test improvement: comparing the training result with native R glmnet. * Remove argument ```aggregationDepth``` from ```spark.logit```, since it's an expert Param(related with Spark architecture and job execution) that would be used rarely by R users. ## How was this patch tested? Unit tests. The ```summary``` output after this change: multinomial logistic regression: ``` > df <- suppressWarnings(createDataFrame(iris)) > model <- spark.logit(df, Species ~ ., regParam = 0.5) > summary(model) $coefficients versicolor virginica setosa (Intercept) 1.514031-2.609108 1.095077 Sepal_Length 0.02511006 0.2649821 -0.2900921 Sepal_Width -0.5291215 -0.02016446 0.549286 Petal_Length 0.03647411 0.1544119 -0.190886 Petal_Width 0.000236092 0.4195804 -0.4198165 ``` binomial logistic regression: ``` > df <- suppressWarnings(createDataFrame(iris)) > training <- df[df$Species %in% c("versicolor", "virginica"), ] > model <- spark.logit(training, Species ~ ., regParam = 0.5) > summary(model) $coefficients Estimate (Intercept) -6.053815 Sepal_Length 0.2449379 Sepal_Width 0.1648321 Petal_Length 0.4730718 Petal_Width 1.031947 ``` Author: Yanbo Liang Closes #16117 from yanboliang/spark-18686. (cherry picked from commit 90b59d1bf262b41c3a5f780697f504030f9d079c) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/340e9aea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/340e9aea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/340e9aea Branch: refs/heads/branch-2.1 Commit: 340e9aea4853805c42b8739004d93efe8fe16ba4 Parents: 3750c6e Author: Yanbo Liang Authored: Wed Dec 7 00:31:11 2016 -0800 Committer: Yanbo Liang Committed: Wed Dec 7 00:32:32 2016 -0800 -- R/pkg/R/mllib.R | 86 +++-- R/pkg/inst/tests/testthat/test_mllib.R | 183 +-- .../spark/ml/r/LogisticRegressionWrapper.scala | 81 3 files changed, 203 insertions(+), 147 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/340e9aea/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index eed8293..074e9cb 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -733,8 +733,6 @@ setMethod("predict", signature(object = "KMeansModel"), #' excepting that at most one value may be 0. The class with largest value p/t is predicted, where p #' is the original probability of that class and t is the class's threshold. #' @param weightCol The weight column name. -#' @param aggregationDepth depth for treeAggregate (>= 2). If the dimensions of features or the number of partitions -#' are large, this param could be adjusted to a larger size. #' @param probabilityCol column name for predicted class conditional probabilities. #' @param ... additional arguments passed to the method. #' @return \code{spark.logit} returns a fitted logistic regression model @@ -746,45 +744,35 @@ setMethod("predict", signature(object = "KMeansModel"), #' \dontrun{ #' sparkR.session() #' # binary logistic regression -#' label <- c(0.0, 0.0, 0.0, 1.0, 1.0) -#' features <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776) -#' binary_data <- as.data.frame(cbind(label, features)) -#' binary_df <- createDataFrame(binary_data) -#' blr_model <- spark.logit(binary_df, label ~ features, thresholds = 1.0) -#' blr_predict <- collect(select(predict(blr_model, binary_df), "prediction")) -#' -#' # summary of binary logistic regression -#' blr_summary <- summary(blr_model) -#' blr_fmeasure <- collect(select(blr_summary$fMeasureByThreshold, "threshold", "F-Measure")) +#' df <- createDataFrame(iris) +#' training <- df[df$Species %in% c("versicolor", "virginica"), ] +#' model <- spark.logit(training, Species ~ ., regParam = 0.5) +#' summary <- summary(model) +#' +#' # fitted values on training data +#' fitted <- predict(model, training) +#' #'
spark git commit: [SPARK-18686][SPARKR][ML] Several cleanup and improvements for spark.logit.
Repository: spark Updated Branches: refs/heads/master 5c6bcdbda -> 90b59d1bf [SPARK-18686][SPARKR][ML] Several cleanup and improvements for spark.logit. ## What changes were proposed in this pull request? Several cleanup and improvements for ```spark.logit```: * ```summary``` should return coefficients matrix, and should output labels for each class if the model is multinomial logistic regression model. * ```summary``` should not return ```areaUnderROC, roc, pr, ...```, since most of them are DataFrame which are less important for R users. Meanwhile, these metrics ignore instance weights (setting all to 1.0) which will be changed in later Spark version. In case it will introduce breaking changes, we do not expose them currently. * SparkR test improvement: comparing the training result with native R glmnet. * Remove argument ```aggregationDepth``` from ```spark.logit```, since it's an expert Param(related with Spark architecture and job execution) that would be used rarely by R users. ## How was this patch tested? Unit tests. The ```summary``` output after this change: multinomial logistic regression: ``` > df <- suppressWarnings(createDataFrame(iris)) > model <- spark.logit(df, Species ~ ., regParam = 0.5) > summary(model) $coefficients versicolor virginica setosa (Intercept) 1.514031-2.609108 1.095077 Sepal_Length 0.02511006 0.2649821 -0.2900921 Sepal_Width -0.5291215 -0.02016446 0.549286 Petal_Length 0.03647411 0.1544119 -0.190886 Petal_Width 0.000236092 0.4195804 -0.4198165 ``` binomial logistic regression: ``` > df <- suppressWarnings(createDataFrame(iris)) > training <- df[df$Species %in% c("versicolor", "virginica"), ] > model <- spark.logit(training, Species ~ ., regParam = 0.5) > summary(model) $coefficients Estimate (Intercept) -6.053815 Sepal_Length 0.2449379 Sepal_Width 0.1648321 Petal_Length 0.4730718 Petal_Width 1.031947 ``` Author: Yanbo Liang Closes #16117 from yanboliang/spark-18686. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90b59d1b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90b59d1b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90b59d1b Branch: refs/heads/master Commit: 90b59d1bf262b41c3a5f780697f504030f9d079c Parents: 5c6bcdb Author: Yanbo Liang Authored: Wed Dec 7 00:31:11 2016 -0800 Committer: Yanbo Liang Committed: Wed Dec 7 00:31:11 2016 -0800 -- R/pkg/R/mllib.R | 86 +++-- R/pkg/inst/tests/testthat/test_mllib.R | 183 +-- .../spark/ml/r/LogisticRegressionWrapper.scala | 81 3 files changed, 203 insertions(+), 147 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/90b59d1b/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index eed8293..074e9cb 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -733,8 +733,6 @@ setMethod("predict", signature(object = "KMeansModel"), #' excepting that at most one value may be 0. The class with largest value p/t is predicted, where p #' is the original probability of that class and t is the class's threshold. #' @param weightCol The weight column name. -#' @param aggregationDepth depth for treeAggregate (>= 2). If the dimensions of features or the number of partitions -#' are large, this param could be adjusted to a larger size. #' @param probabilityCol column name for predicted class conditional probabilities. #' @param ... additional arguments passed to the method. #' @return \code{spark.logit} returns a fitted logistic regression model @@ -746,45 +744,35 @@ setMethod("predict", signature(object = "KMeansModel"), #' \dontrun{ #' sparkR.session() #' # binary logistic regression -#' label <- c(0.0, 0.0, 0.0, 1.0, 1.0) -#' features <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776) -#' binary_data <- as.data.frame(cbind(label, features)) -#' binary_df <- createDataFrame(binary_data) -#' blr_model <- spark.logit(binary_df, label ~ features, thresholds = 1.0) -#' blr_predict <- collect(select(predict(blr_model, binary_df), "prediction")) -#' -#' # summary of binary logistic regression -#' blr_summary <- summary(blr_model) -#' blr_fmeasure <- collect(select(blr_summary$fMeasureByThreshold, "threshold", "F-Measure")) +#' df <- createDataFrame(iris) +#' training <- df[df$Species %in% c("versicolor", "virginica"), ] +#' model <- spark.logit(training, Species ~ ., regParam = 0.5) +#' summary <- summary(model) +#' +#' # fitted values on training data +#' fitted <- predict(model, training) +#' #' # save fitted model to input path #' path <- "path/to/model" -#' write.ml(blr_model, path) +#' write.ml(m