spark git commit: [SQL][DOC] updating doc for JSON source to link to jsonlines.org
Repository: spark Updated Branches: refs/heads/master 1dbe9896b -> 44c8bfda7 [SQL][DOC] updating doc for JSON source to link to jsonlines.org ## What changes were proposed in this pull request? API and programming guide doc changes for Scala, Python and R. ## How was this patch tested? manual test Author: Felix Cheung Closes #15629 from felixcheung/jsondoc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44c8bfda Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44c8bfda Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44c8bfda Branch: refs/heads/master Commit: 44c8bfda793b7655e2bd1da5e9915a09ed9d42ce Parents: 1dbe989 Author: Felix Cheung Authored: Wed Oct 26 23:06:11 2016 -0700 Committer: Felix Cheung Committed: Wed Oct 26 23:06:11 2016 -0700 -- R/pkg/R/DataFrame.R | 3 ++- R/pkg/R/SQLContext.R| 3 ++- docs/sparkr.md | 2 +- docs/sql-programming-guide.md | 22 python/pyspark/sql/readwriter.py| 5 +++-- python/pyspark/sql/streaming.py | 3 ++- .../org/apache/spark/sql/DataFrameReader.scala | 14 +++-- .../org/apache/spark/sql/DataFrameWriter.scala | 3 ++- .../spark/sql/streaming/DataStreamReader.scala | 3 ++- 9 files changed, 35 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/44c8bfda/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index be34e4b..1df8bbf 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -761,7 +761,8 @@ setMethod("toJSON", #' Save the contents of SparkDataFrame as a JSON file #' -#' Save the contents of a SparkDataFrame as a JSON file (one object per line). Files written out +#' Save the contents of a SparkDataFrame as a JSON file (\href{http://jsonlines.org/}{ +#' JSON Lines text format or newline-delimited JSON}). Files written out #' with this method can be read back in as a SparkDataFrame using read.json(). #' #' @param x A SparkDataFrame http://git-wip-us.apache.org/repos/asf/spark/blob/44c8bfda/R/pkg/R/SQLContext.R -- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 0d6a229..216ca51 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -324,7 +324,8 @@ setMethod("toDF", signature(x = "RDD"), #' Create a SparkDataFrame from a JSON file. #' -#' Loads a JSON file (one object per line), returning the result as a SparkDataFrame +#' Loads a JSON file (\href{http://jsonlines.org/}{JSON Lines text format or newline-delimited JSON} +#' ), returning the result as a SparkDataFrame #' It goes through the entire dataset once to determine the schema. #' #' @param path Path of file to read. A vector of multiple paths is allowed. http://git-wip-us.apache.org/repos/asf/spark/blob/44c8bfda/docs/sparkr.md -- diff --git a/docs/sparkr.md b/docs/sparkr.md index c1829ef..f30bd40 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -135,7 +135,7 @@ sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0") {% endhighlight %} -We can see how to use data sources using an example JSON input file. Note that the file that is used here is _not_ a typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail. +We can see how to use data sources using an example JSON input file. Note that the file that is used here is _not_ a typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. For more information, please see [JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/). As a consequence, a regular multi-line JSON file will most often fail. {% highlight r %} http://git-wip-us.apache.org/repos/asf/spark/blob/44c8bfda/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 064af41..b9be7a7 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -316,7 +316,7 @@ Serializable and has getters and setters for all of its fields. Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, -and the types are inferred by sampling the whole datase, similar to the i
spark git commit: [SPARK-17157][SPARKR][FOLLOW-UP] doc fixes
Repository: spark Updated Branches: refs/heads/master d3b4831d0 -> 1dbe9896b [SPARK-17157][SPARKR][FOLLOW-UP] doc fixes ## What changes were proposed in this pull request? a couple of small late finding fixes for doc ## How was this patch tested? manually wangmiao1981 Author: Felix Cheung Closes #15650 from felixcheung/logitfix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1dbe9896 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1dbe9896 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1dbe9896 Branch: refs/heads/master Commit: 1dbe9896b7f30538a5fad2f5d718d035c7906936 Parents: d3b4831 Author: Felix Cheung Authored: Wed Oct 26 23:02:54 2016 -0700 Committer: Felix Cheung Committed: Wed Oct 26 23:02:54 2016 -0700 -- R/pkg/R/mllib.R | 26 +++--- 1 file changed, 11 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1dbe9896/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index e441db9..629f284 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -111,8 +111,9 @@ setClass("LogisticRegressionModel", representation(jobj = "jobj")) #' @export #' @seealso \link{spark.glm}, \link{glm}, #' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans}, -#' @seealso \link{spark.lda}, \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg} -#' @seealso \link{spark.logit}, \link{read.ml} +#' @seealso \link{spark.lda}, \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes}, +#' @seealso \link{spark.survreg} +#' @seealso \link{read.ml} NULL #' Makes predictions from a MLlib model @@ -124,7 +125,7 @@ NULL #' @export #' @seealso \link{spark.glm}, \link{glm}, #' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans}, -#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg}, \link{spark.logit} +#' @seealso \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg} NULL write_internal <- function(object, path, overwrite = FALSE) { @@ -671,14 +672,13 @@ setMethod("predict", signature(object = "KMeansModel"), #' @param tol convergence tolerance of iterations. #' @param fitIntercept whether to fit an intercept term. Default is TRUE. #' @param family the name of family which is a description of the label distribution to be used in the model. -#' Supported options: +#' Supported options: Default is "auto". #' \itemize{ #' \item{"auto": Automatically select the family based on the number of classes: #' If number of classes == 1 || number of classes == 2, set to "binomial". #' Else, set to "multinomial".} #' \item{"binomial": Binary logistic regression with pivoting.} -#' \item{"multinomial": Multinomial logistic (softmax) regression without pivoting. -#' Default is "auto".} +#' \item{"multinomial": Multinomial logistic (softmax) regression without pivoting.} #' } #' @param standardization whether to standardize the training features before fitting the model. The coefficients #'of models will be always returned on the original scale, so it will be transparent for @@ -687,14 +687,10 @@ setMethod("predict", signature(object = "KMeansModel"), #' @param thresholds in binary classification, in range [0, 1]. If the estimated probability of class label 1 #' is > threshold, then predict 1, else 0. A high threshold encourages the model to predict 0 #' more often; a low threshold encourages the model to predict 1 more often. Note: Setting this with -#' threshold p is equivalent to setting thresholds c(1-p, p). When threshold is set, any user-set -#' value for thresholds will be cleared. If both threshold and thresholds are set, then they must be -#' equivalent. In multiclass (or binary) classification to adjust the probability of +#' threshold p is equivalent to setting thresholds c(1-p, p). In multiclass (or binary) classification to adjust the probability of #' predicting each class. Array must have length equal to the number of classes, with values > 0, #' excepting that at most one value may be 0. The class with largest value p/t is predicted, where p -#' is the original probability of that class and t is the class's threshold. Note: When thresholds -#' is set, any user-set val
spark git commit: [SPARK-18132] Fix checkstyle
Repository: spark Updated Branches: refs/heads/branch-2.0 dcf2f090c -> 1a4be51d6 [SPARK-18132] Fix checkstyle This PR fixes checkstyle. Author: Yin Huai Closes #15656 from yhuai/fix-format. (cherry picked from commit d3b4831d009905185ad74096ce3ecfa934bc191d) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a4be51d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a4be51d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a4be51d Branch: refs/heads/branch-2.0 Commit: 1a4be51d64eaafe2fa0e69d0c3c81f7b40051427 Parents: dcf2f09 Author: Yin Huai Authored: Wed Oct 26 22:22:23 2016 -0700 Committer: Yin Huai Committed: Wed Oct 26 22:22:55 2016 -0700 -- .../spark/util/collection/unsafe/sort/UnsafeExternalSorter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1a4be51d/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 6e03064..56d54a1 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -142,9 +142,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer { this.recordComparator = recordComparator; this.prefixComparator = prefixComparator; // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units -// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; +// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024 this.fileBufferSizeBytes = 32 * 1024; -// The spill metrics are stored in a new ShuffleWriteMetrics, and then discarded (this fixes SPARK-16827). +// The spill metrics are stored in a new ShuffleWriteMetrics, +// and then discarded (this fixes SPARK-16827). // TODO: Instead, separate spill metrics should be stored and reported (tracked in SPARK-3577). this.writeMetrics = new ShuffleWriteMetrics(); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18132] Fix checkstyle
Repository: spark Updated Branches: refs/heads/master dd4f088c1 -> d3b4831d0 [SPARK-18132] Fix checkstyle This PR fixes checkstyle. Author: Yin Huai Closes #15656 from yhuai/fix-format. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3b4831d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3b4831d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3b4831d Branch: refs/heads/master Commit: d3b4831d009905185ad74096ce3ecfa934bc191d Parents: dd4f088 Author: Yin Huai Authored: Wed Oct 26 22:22:23 2016 -0700 Committer: Yin Huai Committed: Wed Oct 26 22:22:23 2016 -0700 -- .../spark/util/collection/unsafe/sort/UnsafeExternalSorter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d3b4831d/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 7835017..dcae4a3 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -143,9 +143,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer { this.recordComparator = recordComparator; this.prefixComparator = prefixComparator; // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units -// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; +// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024 this.fileBufferSizeBytes = 32 * 1024; -// The spill metrics are stored in a new ShuffleWriteMetrics, and then discarded (this fixes SPARK-16827). +// The spill metrics are stored in a new ShuffleWriteMetrics, +// and then discarded (this fixes SPARK-16827). // TODO: Instead, separate spill metrics should be stored and reported (tracked in SPARK-3577). this.writeMetrics = new ShuffleWriteMetrics(); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18009][SQL] Fix ClassCastException while calling toLocalIterator() on dataframe produced by RunnableCommand
Repository: spark Updated Branches: refs/heads/branch-2.0 ea205e376 -> dcf2f090c [SPARK-18009][SQL] Fix ClassCastException while calling toLocalIterator() on dataframe produced by RunnableCommand A short code snippet that uses toLocalIterator() on a dataframe produced by a RunnableCommand reproduces the problem. toLocalIterator() is called by thriftserver when `spark.sql.thriftServer.incrementalCollect`is set to handle queries producing large result set. **Before** ```SQL scala> spark.sql("show databases") res0: org.apache.spark.sql.DataFrame = [databaseName: string] scala> res0.toLocalIterator() 16/10/26 03:00:24 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow ``` **After** ```SQL scala> spark.sql("drop database databases") res30: org.apache.spark.sql.DataFrame = [] scala> spark.sql("show databases") res31: org.apache.spark.sql.DataFrame = [databaseName: string] scala> res31.toLocalIterator().asScala foreach println [default] [parquet] ``` Added a test in DDLSuite Author: Dilip Biswal Closes #15642 from dilipbiswal/SPARK-18009. (cherry picked from commit dd4f088c1df6abd728e5544a17ba85322bedfe4c) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dcf2f090 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dcf2f090 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dcf2f090 Branch: refs/heads/branch-2.0 Commit: dcf2f090cab768203e9767f050612d2838368c4f Parents: ea205e3 Author: Dilip Biswal Authored: Thu Oct 27 13:12:14 2016 +0800 Committer: Wenchen Fan Committed: Thu Oct 27 13:14:32 2016 +0800 -- .../org/apache/spark/sql/execution/command/commands.scala | 2 ++ .../org/apache/spark/sql/execution/command/DDLSuite.scala | 7 +++ 2 files changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dcf2f090/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 698c625..d82e54e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -66,6 +66,8 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan { override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray + override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator + override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray protected override def doExecute(): RDD[InternalRow] = { http://git-wip-us.apache.org/repos/asf/spark/blob/dcf2f090/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index caa2fca..252064d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1489,4 +1489,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(sql("show user functions").count() === 1L) } } + + test("SPARK-18009 calling toLocalIterator on commands") { +import scala.collection.JavaConverters._ +val df = sql("show databases") +val rows: Seq[Row] = df.toLocalIterator().asScala.toSeq +assert(rows.length > 0) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18009][SQL] Fix ClassCastException while calling toLocalIterator() on dataframe produced by RunnableCommand
Repository: spark Updated Branches: refs/heads/master f1aeed8b0 -> dd4f088c1 [SPARK-18009][SQL] Fix ClassCastException while calling toLocalIterator() on dataframe produced by RunnableCommand ## What changes were proposed in this pull request? A short code snippet that uses toLocalIterator() on a dataframe produced by a RunnableCommand reproduces the problem. toLocalIterator() is called by thriftserver when `spark.sql.thriftServer.incrementalCollect`is set to handle queries producing large result set. **Before** ```SQL scala> spark.sql("show databases") res0: org.apache.spark.sql.DataFrame = [databaseName: string] scala> res0.toLocalIterator() 16/10/26 03:00:24 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow ``` **After** ```SQL scala> spark.sql("drop database databases") res30: org.apache.spark.sql.DataFrame = [] scala> spark.sql("show databases") res31: org.apache.spark.sql.DataFrame = [databaseName: string] scala> res31.toLocalIterator().asScala foreach println [default] [parquet] ``` ## How was this patch tested? Added a test in DDLSuite Author: Dilip Biswal Closes #15642 from dilipbiswal/SPARK-18009. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd4f088c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd4f088c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd4f088c Branch: refs/heads/master Commit: dd4f088c1df6abd728e5544a17ba85322bedfe4c Parents: f1aeed8 Author: Dilip Biswal Authored: Thu Oct 27 13:12:14 2016 +0800 Committer: Wenchen Fan Committed: Thu Oct 27 13:12:14 2016 +0800 -- .../org/apache/spark/sql/execution/command/commands.scala | 2 ++ .../org/apache/spark/sql/execution/command/DDLSuite.scala | 7 +++ 2 files changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dd4f088c/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 698c625..d82e54e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -66,6 +66,8 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan { override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray + override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator + override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray protected override def doExecute(): RDD[InternalRow] = { http://git-wip-us.apache.org/repos/asf/spark/blob/dd4f088c/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index de326f8..b989d01 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1805,4 +1805,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } } + + test("SPARK-18009 calling toLocalIterator on commands") { +import scala.collection.JavaConverters._ +val df = sql("show databases") +val rows: Seq[Row] = df.toLocalIterator().asScala.toSeq +assert(rows.length > 0) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17770][CATALYST] making ObjectType public
Repository: spark Updated Branches: refs/heads/master 5b27598ff -> f1aeed8b0 [SPARK-17770][CATALYST] making ObjectType public ## What changes were proposed in this pull request? In order to facilitate the writing of additional Encoders, I proposed opening up the ObjectType SQL DataType. This DataType is used extensively in the JavaBean Encoder, but would also be useful in writing other custom encoders. As mentioned by marmbrus, it is understood that the Expressions API is subject to potential change. ## How was this patch tested? The change only affects the visibility of the ObjectType class, and the existing SQL test suite still runs without error. Author: ALeksander Eskilson Closes #15453 from bdrillard/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1aeed8b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1aeed8b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1aeed8b Branch: refs/heads/master Commit: f1aeed8b022e043de2eb38b30187dcc36ee8dcdb Parents: 5b27598 Author: ALeksander Eskilson Authored: Wed Oct 26 18:03:31 2016 -0700 Committer: Michael Armbrust Committed: Wed Oct 26 18:03:31 2016 -0700 -- .../scala/org/apache/spark/sql/types/ObjectType.scala | 12 +++- 1 file changed, 7 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f1aeed8b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala index c741a2d..b18fba2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala @@ -19,7 +19,10 @@ package org.apache.spark.sql.types import scala.language.existentials -private[sql] object ObjectType extends AbstractDataType { +import org.apache.spark.annotation.InterfaceStability + +@InterfaceStability.Evolving +object ObjectType extends AbstractDataType { override private[sql] def defaultConcreteType: DataType = throw new UnsupportedOperationException("null literals can't be casted to ObjectType") @@ -32,11 +35,10 @@ private[sql] object ObjectType extends AbstractDataType { } /** - * Represents a JVM object that is passing through Spark SQL expression evaluation. Note this - * is only used internally while converting into the internal format and is not intended for use - * outside of the execution engine. + * Represents a JVM object that is passing through Spark SQL expression evaluation. */ -private[sql] case class ObjectType(cls: Class[_]) extends DataType { +@InterfaceStability.Evolving +case class ObjectType(cls: Class[_]) extends DataType { override def defaultSize: Int = 4096 def asNullable: DataType = this - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16963][STREAMING][SQL] Changes to Source trait and related implementation classes
Repository: spark Updated Branches: refs/heads/branch-2.0 72b3cff33 -> ea205e376 [SPARK-16963][STREAMING][SQL] Changes to Source trait and related implementation classes ## What changes were proposed in this pull request? This PR contains changes to the Source trait such that the scheduler can notify data sources when it is safe to discard buffered data. Summary of changes: * Added a method `commit(end: Offset)` that tells the Source that is OK to discard all offsets up `end`, inclusive. * Changed the semantics of a `None` value for the `getBatch` method to mean "from the very beginning of the stream"; as opposed to "all data present in the Source's buffer". * Added notes that the upper layers of the system will never call `getBatch` with a start value less than the last value passed to `commit`. * Added a `lastCommittedOffset` method to allow the scheduler to query the status of each Source on restart. This addition is not strictly necessary, but it seemed like a good idea -- Sources will be maintaining their own persistent state, and there may be bugs in the checkpointing code. * The scheduler in `StreamExecution.scala` now calls `commit` on its stream sources after marking each batch as complete in its checkpoint. * `MemoryStream` now cleans committed batches out of its internal buffer. * `TextSocketSource` now cleans committed batches from its internal buffer. ## How was this patch tested? Existing regression tests already exercise the new code. Author: frreiss Closes #14553 from frreiss/fred-16963. (cherry picked from commit 5b27598ff50cb08e7570fade458da0a3d4d4eabc) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea205e37 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea205e37 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea205e37 Branch: refs/heads/branch-2.0 Commit: ea205e376d555869519ee59186f53ed573ccee39 Parents: 72b3cff Author: frreiss Authored: Wed Oct 26 17:33:08 2016 -0700 Committer: Shixiong Zhu Committed: Wed Oct 26 17:33:16 2016 -0700 -- .../execution/streaming/FileStreamSource.scala | 9 +++ .../spark/sql/execution/streaming/Source.scala | 22 -- .../execution/streaming/StreamExecution.scala | 32 ++--- .../spark/sql/execution/streaming/memory.scala | 47 +++-- .../spark/sql/execution/streaming/socket.scala | 72 .../sql/streaming/StreamingQuerySuite.scala | 8 +-- 6 files changed, 154 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ea205e37/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 5ada238..c47033a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -176,6 +176,15 @@ class FileStreamSource( override def toString: String = s"FileStreamSource[$qualifiedBasePath]" + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + override def commit(end: Offset): Unit = { +// No-op for now; FileStreamSource currently garbage-collects files based on timestamp +// and the value of the maxFileAge parameter. + } + override def stop() {} } http://git-wip-us.apache.org/repos/asf/spark/blob/ea205e37/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 9711478..f3bd5bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -30,16 +30,30 @@ trait Source { /** Returns the schema of the data from this source */ def schema: StructType - /** Returns the maximum available offset for this source. */ + /** + * Returns the maximum available offset for this source. + * Returns `None` if this source has never received any data. + */ def getOffset: Option[Offset] /** - * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then - * the batch should begin with the first available record
spark git commit: [SPARK-16963][STREAMING][SQL] Changes to Source trait and related implementation classes
Repository: spark Updated Branches: refs/heads/master a76846cfb -> 5b27598ff [SPARK-16963][STREAMING][SQL] Changes to Source trait and related implementation classes ## What changes were proposed in this pull request? This PR contains changes to the Source trait such that the scheduler can notify data sources when it is safe to discard buffered data. Summary of changes: * Added a method `commit(end: Offset)` that tells the Source that is OK to discard all offsets up `end`, inclusive. * Changed the semantics of a `None` value for the `getBatch` method to mean "from the very beginning of the stream"; as opposed to "all data present in the Source's buffer". * Added notes that the upper layers of the system will never call `getBatch` with a start value less than the last value passed to `commit`. * Added a `lastCommittedOffset` method to allow the scheduler to query the status of each Source on restart. This addition is not strictly necessary, but it seemed like a good idea -- Sources will be maintaining their own persistent state, and there may be bugs in the checkpointing code. * The scheduler in `StreamExecution.scala` now calls `commit` on its stream sources after marking each batch as complete in its checkpoint. * `MemoryStream` now cleans committed batches out of its internal buffer. * `TextSocketSource` now cleans committed batches from its internal buffer. ## How was this patch tested? Existing regression tests already exercise the new code. Author: frreiss Closes #14553 from frreiss/fred-16963. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b27598f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b27598f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b27598f Branch: refs/heads/master Commit: 5b27598ff50cb08e7570fade458da0a3d4d4eabc Parents: a76846c Author: frreiss Authored: Wed Oct 26 17:33:08 2016 -0700 Committer: Shixiong Zhu Committed: Wed Oct 26 17:33:08 2016 -0700 -- .../execution/streaming/FileStreamSource.scala | 9 +++ .../spark/sql/execution/streaming/Source.scala | 22 -- .../execution/streaming/StreamExecution.scala | 32 ++--- .../spark/sql/execution/streaming/memory.scala | 47 +++-- .../spark/sql/execution/streaming/socket.scala | 72 .../sql/streaming/StreamingQuerySuite.scala | 8 +-- 6 files changed, 154 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b27598f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 115edf7..a392b82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -176,6 +176,15 @@ class FileStreamSource( override def toString: String = s"FileStreamSource[$qualifiedBasePath]" + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + override def commit(end: Offset): Unit = { +// No-op for now; FileStreamSource currently garbage-collects files based on timestamp +// and the value of the maxFileAge parameter. + } + override def stop() {} } http://git-wip-us.apache.org/repos/asf/spark/blob/5b27598f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 9711478..f3bd5bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -30,16 +30,30 @@ trait Source { /** Returns the schema of the data from this source */ def schema: StructType - /** Returns the maximum available offset for this source. */ + /** + * Returns the maximum available offset for this source. + * Returns `None` if this source has never received any data. + */ def getOffset: Option[Offset] /** - * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then - * the batch should begin with the first available record. This method must always return the - * same data for a particular `start` and `end` pair. + * Return
spark git commit: [SPARK-18126][SPARK-CORE] getIteratorZipWithIndex accepts negative value as index
Repository: spark Updated Branches: refs/heads/master 29cea8f33 -> a76846cfb [SPARK-18126][SPARK-CORE] getIteratorZipWithIndex accepts negative value as index ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) `Utils.getIteratorZipWithIndex` was added to deal with number of records > 2147483647 in one partition. method `getIteratorZipWithIndex` accepts `startIndex` < 0, which leads to negative index. This PR just adds a defensive check on `startIndex` to make sure it is >= 0. ## How was this patch tested? Add a new unit test. Author: Miao Wang Closes #15639 from wangmiao1981/zip. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a76846cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a76846cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a76846cf Branch: refs/heads/master Commit: a76846cfb1c2d6c8f4d647426030b59de20d9433 Parents: 29cea8f Author: Miao Wang Authored: Thu Oct 27 01:17:32 2016 +0200 Committer: Reynold Xin Committed: Thu Oct 27 01:17:32 2016 +0200 -- core/src/main/scala/org/apache/spark/util/Utils.scala | 1 + core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 3 +++ 2 files changed, 4 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a76846cf/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index e57eb0d..6027b07 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1765,6 +1765,7 @@ private[spark] object Utils extends Logging { */ def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = { new Iterator[(T, Long)] { + require(startIndex >= 0, "startIndex should be >= 0.") var index: Long = startIndex - 1L def hasNext: Boolean = iterator.hasNext def next(): (T, Long) = { http://git-wip-us.apache.org/repos/asf/spark/blob/a76846cf/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index aeb2969..15ef32f 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -401,6 +401,9 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(iterator.toArray === Array( (0, -1L + Int.MaxValue), (1, 0L + Int.MaxValue), (2, 1L + Int.MaxValue) )) +intercept[IllegalArgumentException] { + Utils.getIteratorZipWithIndex(Iterator(0, 1, 2), -1L) +} } test("doesDirectoryContainFilesNewerThan") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17157][SPARKR] Add multiclass logistic regression SparkR Wrapper
Repository: spark Updated Branches: refs/heads/master 5b7d403c1 -> 29cea8f33 [SPARK-17157][SPARKR] Add multiclass logistic regression SparkR Wrapper ## What changes were proposed in this pull request? As we discussed in #14818, I added a separate R wrapper spark.logit for logistic regression. This single interface supports both binary and multinomial logistic regression. It also has "predict" and "summary" for binary logistic regression. ## How was this patch tested? New unit tests are added. Author: wm...@hotmail.com Closes #15365 from wangmiao1981/glm. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29cea8f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29cea8f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29cea8f3 Branch: refs/heads/master Commit: 29cea8f332aa3750f8ff7c3b9e705d107278da4b Parents: 5b7d403 Author: wm...@hotmail.com Authored: Wed Oct 26 16:12:55 2016 -0700 Committer: Felix Cheung Committed: Wed Oct 26 16:12:55 2016 -0700 -- R/pkg/NAMESPACE | 3 +- R/pkg/R/generics.R | 4 + R/pkg/R/mllib.R | 192 ++- R/pkg/inst/tests/testthat/test_mllib.R | 55 ++ .../spark/ml/r/LogisticRegressionWrapper.scala | 157 +++ .../scala/org/apache/spark/ml/r/RWrappers.scala | 2 + 6 files changed, 410 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/29cea8f3/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index eb314f4..7a89c01 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -43,7 +43,8 @@ exportMethods("glm", "spark.isoreg", "spark.gaussianMixture", "spark.als", - "spark.kstest") + "spark.kstest", + "spark.logit") # Job group lifecycle management methods export("setJobGroup", http://git-wip-us.apache.org/repos/asf/spark/blob/29cea8f3/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 4569fe4..107e1c6 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1375,6 +1375,10 @@ setGeneric("spark.gaussianMixture", standardGeneric("spark.gaussianMixture") }) +#' @rdname spark.logit +#' @export +setGeneric("spark.logit", function(data, formula, ...) { standardGeneric("spark.logit") }) + #' @param object a fitted ML model object. #' @param path the directory where the model is saved. #' @param ... additional argument(s) passed to the method. http://git-wip-us.apache.org/repos/asf/spark/blob/29cea8f3/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index bf182be..e441db9 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -95,6 +95,13 @@ setClass("ALSModel", representation(jobj = "jobj")) #' @note KSTest since 2.1.0 setClass("KSTest", representation(jobj = "jobj")) +#' S4 class that represents an LogisticRegressionModel +#' +#' @param jobj a Java object reference to the backing Scala LogisticRegressionModel +#' @export +#' @note LogisticRegressionModel since 2.1.0 +setClass("LogisticRegressionModel", representation(jobj = "jobj")) + #' Saves the MLlib model to the input path #' #' Saves the MLlib model to the input path. For more information, see the specific @@ -105,7 +112,7 @@ setClass("KSTest", representation(jobj = "jobj")) #' @seealso \link{spark.glm}, \link{glm}, #' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans}, #' @seealso \link{spark.lda}, \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg} -#' @seealso \link{read.ml} +#' @seealso \link{spark.logit}, \link{read.ml} NULL #' Makes predictions from a MLlib model @@ -117,7 +124,7 @@ NULL #' @export #' @seealso \link{spark.glm}, \link{glm}, #' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans}, -#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg} +#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg}, \link{spark.logit} NULL write_internal <- function(object, path, overwrite = FALSE) { @@ -647,6 +654,170 @@ setMethod("predict", signature(object = "KMeansModel"), predict_internal(object, newData) }) +#' Logistic Regression Model +#' +#' Fits an logistic regression model against a Spark DataFrame. It supports "binomial": Binary logistic regression +#' with pivoting; "multinomial": Multinomial logistic (softmax) regression without pivoting, similar
spark git commit: [SPARK-18094][SQL][TESTS] Move group analytics test cases from `SQLQuerySuite` into a query file test.
Repository: spark Updated Branches: refs/heads/master dcdda1978 -> 5b7d403c1 [SPARK-18094][SQL][TESTS] Move group analytics test cases from `SQLQuerySuite` into a query file test. ## What changes were proposed in this pull request? Currently we have several test cases for group analytics(ROLLUP/CUBE/GROUPING SETS) in `SQLQuerySuite`, should better move them into a query file test. The following test cases are moved to `group-analytics.sql`: ``` test("rollup") test("grouping sets when aggregate functions containing groupBy columns") test("cube") test("grouping sets") test("grouping and grouping_id") test("grouping and grouping_id in having") test("grouping and grouping_id in sort") ``` This is followup work of #15582 ## How was this patch tested? Modified query file `group-analytics.sql`, which will be tested by `SQLQueryTestSuite`. Author: jiangxingbo Closes #15624 from jiangxb1987/group-analytics-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b7d403c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b7d403c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b7d403c Branch: refs/heads/master Commit: 5b7d403c1819c32a6a5b87d470f8de1a8ad7a987 Parents: dcdda19 Author: jiangxingbo Authored: Wed Oct 26 23:51:16 2016 +0200 Committer: Reynold Xin Committed: Wed Oct 26 23:51:16 2016 +0200 -- .../sql-tests/inputs/group-analytics.sql| 46 +++- .../sql-tests/results/group-analytics.sql.out | 247 ++- .../org/apache/spark/sql/SQLQuerySuite.scala| 189 -- 3 files changed, 290 insertions(+), 192 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b7d403c/sql/core/src/test/resources/sql-tests/inputs/group-analytics.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-analytics.sql b/sql/core/src/test/resources/sql-tests/inputs/group-analytics.sql index 2f78349..f813538 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/group-analytics.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/group-analytics.sql @@ -10,4 +10,48 @@ SELECT a, b, SUM(b) FROM testData GROUP BY a, b WITH CUBE; -- ROLLUP on overlapping columns SELECT a + b, b, SUM(a - b) FROM testData GROUP BY a + b, b WITH ROLLUP; -SELECT a, b, SUM(b) FROM testData GROUP BY a, b WITH ROLLUP; \ No newline at end of file +SELECT a, b, SUM(b) FROM testData GROUP BY a, b WITH ROLLUP; + +CREATE OR REPLACE TEMPORARY VIEW courseSales AS SELECT * FROM VALUES +("dotNET", 2012, 1), ("Java", 2012, 2), ("dotNET", 2012, 5000), ("dotNET", 2013, 48000), ("Java", 2013, 3) +AS courseSales(course, year, earnings); + +-- ROLLUP +SELECT course, year, SUM(earnings) FROM courseSales GROUP BY ROLLUP(course, year) ORDER BY course, year; + +-- CUBE +SELECT course, year, SUM(earnings) FROM courseSales GROUP BY CUBE(course, year) ORDER BY course, year; + +-- GROUPING SETS +SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course, year); +SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(course); +SELECT course, year, SUM(earnings) FROM courseSales GROUP BY course, year GROUPING SETS(year); + +-- GROUPING SETS with aggregate functions containing groupBy columns +SELECT course, SUM(earnings) AS sum FROM courseSales +GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum; +SELECT course, SUM(earnings) AS sum, GROUPING_ID(course, earnings) FROM courseSales +GROUP BY course, earnings GROUPING SETS((), (course), (course, earnings)) ORDER BY course, sum; + +-- GROUPING/GROUPING_ID +SELECT course, year, GROUPING(course), GROUPING(year), GROUPING_ID(course, year) FROM courseSales +GROUP BY CUBE(course, year); +SELECT course, year, GROUPING(course) FROM courseSales GROUP BY course, year; +SELECT course, year, GROUPING_ID(course, year) FROM courseSales GROUP BY course, year; +SELECT course, year, grouping__id FROM courseSales GROUP BY CUBE(course, year); + +-- GROUPING/GROUPING_ID in having clause +SELECT course, year FROM courseSales GROUP BY CUBE(course, year) +HAVING GROUPING(year) = 1 AND GROUPING_ID(course, year) > 0; +SELECT course, year FROM courseSales GROUP BY course, year HAVING GROUPING(course) > 0; +SELECT course, year FROM courseSales GROUP BY course, year HAVING GROUPING_ID(course) > 0; +SELECT course, year FROM courseSales GROUP BY CUBE(course, year) HAVING grouping__id > 0; + +-- GROUPING/GROUPING_ID in orderBy clause +SELECT course, year, GROUPING(course), GROUPING(year) FROM courseSales GROUP BY CUBE(course, year) +ORDER BY GROUPING(course), GROUPING(year), course, year; +SELECT course, year, GROUPING_ID(course, year) FROM courseSales GROUP BY
[2/2] spark git commit: Preparing development version 2.0.3-SNAPSHOT
Preparing development version 2.0.3-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72b3cff3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72b3cff3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72b3cff3 Branch: refs/heads/branch-2.0 Commit: 72b3cff33643529c8d363e0ae1a3fe2bd00ff4fd Parents: 1c2908e Author: Patrick Wendell Authored: Wed Oct 26 14:02:05 2016 -0700 Committer: Patrick Wendell Committed: Wed Oct 26 14:02:05 2016 -0700 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 37 files changed, 38 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72b3cff3/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index dfb7e22..0b01ca8 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,7 +1,7 @@ Package: SparkR Type: Package Title: R Frontend for Apache Spark -Version: 2.0.2 +Version: 2.0.3 Date: 2016-08-27 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), email = "shiva...@cs.berkeley.edu"), http://git-wip-us.apache.org/repos/asf/spark/blob/72b3cff3/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 58feedc..de09fce 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.0.2 +2.0.3-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/72b3cff3/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index a75d222..2ee104f 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2 +2.0.3-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/72b3cff3/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 828a407..b20f9e2 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2 +2.0.3-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/72b3cff3/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 30891f3..06895c6 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2 +2.0.3-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/72b3cff3/common/sketch/pom.xml
[1/2] spark git commit: Preparing Spark release v2.0.2-rc1
Repository: spark Updated Branches: refs/heads/branch-2.0 76b71eef4 -> 72b3cff33 Preparing Spark release v2.0.2-rc1 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c2908ee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c2908ee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c2908ee Branch: refs/heads/branch-2.0 Commit: 1c2908eeb8890fdc91413a3f5bad2bb3d114db6c Parents: 76b71ee Author: Patrick Wendell Authored: Wed Oct 26 14:01:59 2016 -0700 Committer: Patrick Wendell Committed: Wed Oct 26 14:01:59 2016 -0700 -- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 2 +- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 36 files changed, 36 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1c2908ee/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index ca6daa2..58feedc 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.2 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/1c2908ee/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index c727f54..a75d222 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.2 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/1c2908ee/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index e335a89..828a407 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.2 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/1c2908ee/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 8e64f56..30891f3 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.2 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/1c2908ee/common/sketch/pom.xml -- diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index 94c75d6..ea2f4f5 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.2-SNAPSHOT +2.0.2 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/1c2908ee/common/tags/pom.xml -- diff --git a/common/tags/pom.xml b/common/tags
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.0.2-rc1 [created] 1c2908eeb - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-14300][DOCS][MLLIB] Scala MLlib examples code merge and clean up
Repository: spark Updated Branches: refs/heads/master fb0a8a8dd -> dcdda1978 [SPARK-14300][DOCS][MLLIB] Scala MLlib examples code merge and clean up ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-14300 Duplicated code found in scala/examples/mllib, below all deleted in this PR: - DenseGaussianMixture.scala - StreamingLinearRegression.scala ## delete reasons: delete: mllib/DenseGaussianMixture.scala - duplicate of mllib/GaussianMixtureExample delete: mllib/StreamingLinearRegression.scala - duplicate of mllib/StreamingLinearRegressionExample When merging and cleaning those code, be sure not disturb the previous example on and off blocks. ## How was this patch tested? Test with `SKIP_API=1 jekyll` manually to make sure that works well. Author: Xin Ren Closes #12195 from keypointt/SPARK-14300. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dcdda197 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dcdda197 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dcdda197 Branch: refs/heads/master Commit: dcdda19785a272969fb1e3ec18382403aaad6c91 Parents: fb0a8a8 Author: Xin Ren Authored: Wed Oct 26 13:33:23 2016 -0700 Committer: Joseph K. Bradley Committed: Wed Oct 26 13:33:23 2016 -0700 -- .../examples/mllib/DenseGaussianMixture.scala | 75 .../mllib/StreamingLinearRegression.scala | 73 --- .../StreamingLinearRegressionExample.scala | 19 + 3 files changed, 19 insertions(+), 148 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dcdda197/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala deleted file mode 100644 index 90b817b..000 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGaussianMixture.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.mllib - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.mllib.clustering.GaussianMixture -import org.apache.spark.mllib.linalg.Vectors - -/** - * An example Gaussian Mixture Model EM app. Run with - * {{{ - * ./bin/run-example mllib.DenseGaussianMixture - * }}} - * If you use it as a template to create your own app, please use `spark-submit` to submit your app. - */ -object DenseGaussianMixture { - def main(args: Array[String]): Unit = { -if (args.length < 3) { - println("usage: DenseGmmEM [maxIterations]") -} else { - val maxIterations = if (args.length > 3) args(3).toInt else 100 - run(args(0), args(1).toInt, args(2).toDouble, maxIterations) -} - } - - private def run(inputFile: String, k: Int, convergenceTol: Double, maxIterations: Int) { -val conf = new SparkConf().setAppName("Gaussian Mixture Model EM example") -val ctx = new SparkContext(conf) - -val data = ctx.textFile(inputFile).map { line => - Vectors.dense(line.trim.split(' ').map(_.toDouble)) -}.cache() - -val clusters = new GaussianMixture() - .setK(k) - .setConvergenceTol(convergenceTol) - .setMaxIterations(maxIterations) - .run(data) - -for (i <- 0 until clusters.k) { - println("weight=%f\nmu=%s\nsigma=\n%s\n" format -(clusters.weights(i), clusters.gaussians(i).mu, clusters.gaussians(i).sigma)) -} - -println("The membership value of each vector to all mixture components (first <= 100):") -val membership = clusters.predictSoft(data) -membership.take(100).foreach { x => - print(" " + x.mkString(",")) -} -println() -println("Cluster labels (first <= 100):") -val clusterLabels = clusters.predict(data) -clusterLabels.take(10
spark git commit: [SPARK-17961][SPARKR][SQL] Add storageLevel to DataFrame for SparkR
Repository: spark Updated Branches: refs/heads/master ea3605e82 -> fb0a8a8dd [SPARK-17961][SPARKR][SQL] Add storageLevel to DataFrame for SparkR ## What changes were proposed in this pull request? Add storageLevel to DataFrame for SparkR. This is similar to this RP: https://github.com/apache/spark/pull/13780 but in R I do not make a class for `StorageLevel` but add a method `storageToString` ## How was this patch tested? test added. Author: WeichenXu Closes #15516 from WeichenXu123/storageLevel_df_r. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb0a8a8d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb0a8a8d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb0a8a8d Branch: refs/heads/master Commit: fb0a8a8dd7e8985676a846684b956e2d988875c6 Parents: ea3605e Author: WeichenXu Authored: Wed Oct 26 13:26:43 2016 -0700 Committer: Felix Cheung Committed: Wed Oct 26 13:26:43 2016 -0700 -- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 28 +- R/pkg/R/RDD.R | 2 +- R/pkg/R/generics.R| 6 +++- R/pkg/R/utils.R | 41 ++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 5 +++- 6 files changed, 79 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 8718185..eb314f4 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -124,6 +124,7 @@ exportMethods("arrange", "selectExpr", "show", "showDF", + "storageLevel", "subset", "summarize", "summary", http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index b6ce838..be34e4b 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -633,7 +633,7 @@ setMethod("persist", #' @param ... further arguments to be passed to or from other methods. #' #' @family SparkDataFrame functions -#' @rdname unpersist-methods +#' @rdname unpersist #' @aliases unpersist,SparkDataFrame-method #' @name unpersist #' @export @@ -654,6 +654,32 @@ setMethod("unpersist", x }) +#' StorageLevel +#' +#' Get storagelevel of this SparkDataFrame. +#' +#' @param x the SparkDataFrame to get the storageLevel. +#' +#' @family SparkDataFrame functions +#' @rdname storageLevel +#' @aliases storageLevel,SparkDataFrame-method +#' @name storageLevel +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' path <- "path/to/file.json" +#' df <- read.json(path) +#' persist(df, "MEMORY_AND_DISK") +#' storageLevel(df) +#'} +#' @note storageLevel since 2.1.0 +setMethod("storageLevel", + signature(x = "SparkDataFrame"), + function(x) { +storageLevelToString(callJMethod(x@sdf, "storageLevel")) + }) + #' Repartition #' #' The following options for repartition are possible: http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/R/RDD.R -- diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 6cd0704..0f1162f 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -261,7 +261,7 @@ setMethod("persistRDD", #' cache(rdd) # rdd@@env$isCached == TRUE #' unpersistRDD(rdd) # rdd@@env$isCached == FALSE #'} -#' @rdname unpersist-methods +#' @rdname unpersist #' @aliases unpersist,RDD-method #' @noRd setMethod("unpersistRDD", http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 5549cd7..4569fe4 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -691,6 +691,10 @@ setGeneric("selectExpr", function(x, expr, ...) { standardGeneric("selectExpr") #' @export setGeneric("showDF", function(x, ...) { standardGeneric("showDF") }) +# @rdname storageLevel +# @export +setGeneric("storageLevel", function(x) { standardGeneric("storageLevel") }) + #' @rdname subset #' @export setGeneric("subset", function(x, ...) { standardGeneric("subset") }) @@ -715,7 +719,7 @@ setGeneric("union", function(x, y) { standardGeneric("union") }) #' @export setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") }) -#' @rdname unpersist-methods +#' @rdname unpersist #' @export setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") }) http://git-wip-us.apache.org/repos/asf/spark/blob/f
spark git commit: [SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL (branch 2.0)
Repository: spark Updated Branches: refs/heads/branch-2.0 b482b3d58 -> 76b71eef4 [SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL (branch 2.0) ## What changes were proposed in this pull request? Backport #15520 to 2.0. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #15646 from zsxwing/SPARK-13747-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76b71eef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76b71eef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76b71eef Branch: refs/heads/branch-2.0 Commit: 76b71eef46a3b932d6de7f831f0245ea27e3dfe7 Parents: b482b3d Author: Shixiong Zhu Authored: Wed Oct 26 13:21:46 2016 -0700 Committer: Shixiong Zhu Committed: Wed Oct 26 13:21:46 2016 -0700 -- .../org/apache/spark/util/ThreadUtils.scala | 21 scalastyle-config.xml | 1 + .../apache/spark/sql/execution/SparkPlan.scala | 2 +- .../exchange/BroadcastExchangeExec.scala| 3 ++- 4 files changed, 25 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/76b71eef/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 5a6dbc8..d093e7b 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -194,4 +194,25 @@ private[spark] object ThreadUtils { throw new SparkException("Exception thrown in awaitResult: ", t) } } + + /** + * Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s `BlockingContext`, wraps + * and re-throws any exceptions with nice stack track. + * + * Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent + * executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method + * basically prevents ForkJoinPool from running other tasks in the current waiting thread. + */ + @throws(classOf[SparkException]) + def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: Duration): T = { +try { + // `awaitPermission` is not actually used anywhere so it's safe to pass in null here. + // See SPARK-13747. + val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] + awaitable.result(Duration.Inf)(awaitPermission) +} catch { + case NonFatal(t) => +throw new SparkException("Exception thrown in awaitResult: ", t) +} + } } http://git-wip-us.apache.org/repos/asf/spark/blob/76b71eef/scalastyle-config.xml -- diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 7fe0697..81d57d7 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -200,6 +200,7 @@ This file is divided into 3 sections: // scalastyle:off awaitresult Await.result(...) // scalastyle:on awaitresult + If your codes use ThreadLocal and may run in threads created by the user, use ThreadUtils.awaitResultInForkJoinSafely instead. ]]> http://git-wip-us.apache.org/repos/asf/spark/blob/76b71eef/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 79cb409..fa40414 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -166,7 +166,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def waitForSubqueries(): Unit = synchronized { // fill in the result of subqueries subqueryResults.foreach { case (e, futureResult) => - val rows = ThreadUtils.awaitResult(futureResult, Duration.Inf) + val rows = ThreadUtils.awaitResultInForkJoinSafely(futureResult, Duration.Inf) if (rows.length > 1) { sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}") } http://git-wip-us.apache.org/repos/asf/spark/blob/76b71eef/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange
spark git commit: [MINOR][ML] Refactor clustering summary.
Repository: spark Updated Branches: refs/heads/master 7d10631c1 -> ea3605e82 [MINOR][ML] Refactor clustering summary. ## What changes were proposed in this pull request? Abstract ```ClusteringSummary``` from ```KMeansSummary```, ```GaussianMixtureSummary``` and ```BisectingSummary```, and eliminate duplicated pieces of code. ## How was this patch tested? Existing tests. Author: Yanbo Liang Closes #1 from yanboliang/clustering-summary. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea3605e8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea3605e8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea3605e8 Branch: refs/heads/master Commit: ea3605e82545031a00235ee0f449e1e2418674e8 Parents: 7d10631 Author: Yanbo Liang Authored: Wed Oct 26 11:48:54 2016 -0700 Committer: Joseph K. Bradley Committed: Wed Oct 26 11:48:54 2016 -0700 -- .../spark/ml/clustering/BisectingKMeans.scala | 36 +++-- .../spark/ml/clustering/ClusteringSummary.scala | 54 .../spark/ml/clustering/GaussianMixture.scala | 37 -- .../org/apache/spark/ml/clustering/KMeans.scala | 36 +++-- 4 files changed, 80 insertions(+), 83 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ea3605e8/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index ef2d918..2718dd9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -288,35 +288,15 @@ object BisectingKMeans extends DefaultParamsReadable[BisectingKMeans] { * :: Experimental :: * Summary of BisectingKMeans. * - * @param predictions [[DataFrame]] produced by [[BisectingKMeansModel.transform()]] - * @param predictionCol Name for column of predicted clusters in `predictions` - * @param featuresCol Name for column of features in `predictions` - * @param k Number of clusters + * @param predictions [[DataFrame]] produced by [[BisectingKMeansModel.transform()]]. + * @param predictionCol Name for column of predicted clusters in `predictions`. + * @param featuresCol Name for column of features in `predictions`. + * @param k Number of clusters. */ @Since("2.1.0") @Experimental class BisectingKMeansSummary private[clustering] ( -@Since("2.1.0") @transient val predictions: DataFrame, -@Since("2.1.0") val predictionCol: String, -@Since("2.1.0") val featuresCol: String, -@Since("2.1.0") val k: Int) extends Serializable { - - /** - * Cluster centers of the transformed data. - */ - @Since("2.1.0") - @transient lazy val cluster: DataFrame = predictions.select(predictionCol) - - /** - * Size of (number of data points in) each cluster. - */ - @Since("2.1.0") - lazy val clusterSizes: Array[Long] = { -val sizes = Array.fill[Long](k)(0) -cluster.groupBy(predictionCol).count().select(predictionCol, "count").collect().foreach { - case Row(cluster: Int, count: Long) => sizes(cluster) = count -} -sizes - } - -} +predictions: DataFrame, +predictionCol: String, +featuresCol: String, +k: Int) extends ClusteringSummary(predictions, predictionCol, featuresCol, k) http://git-wip-us.apache.org/repos/asf/spark/blob/ea3605e8/mllib/src/main/scala/org/apache/spark/ml/clustering/ClusteringSummary.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/ClusteringSummary.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/ClusteringSummary.scala new file mode 100644 index 000..8b5f525 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/ClusteringSummary.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations und
spark git commit: [SPARK-18104][DOC] Don't build KafkaSource doc
Repository: spark Updated Branches: refs/heads/master fa7d9d708 -> 7d10631c1 [SPARK-18104][DOC] Don't build KafkaSource doc ## What changes were proposed in this pull request? Don't need to build doc for KafkaSource because the user should use the data source APIs to use KafkaSource. All KafkaSource APIs are internal. ## How was this patch tested? Verified manually. Author: Shixiong Zhu Closes #15630 from zsxwing/kafka-unidoc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d10631c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d10631c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d10631c Branch: refs/heads/master Commit: 7d10631c16b980adf1f55378c128436310daed65 Parents: fa7d9d7 Author: Shixiong Zhu Authored: Wed Oct 26 11:16:20 2016 -0700 Committer: Shixiong Zhu Committed: Wed Oct 26 11:16:20 2016 -0700 -- project/SparkBuild.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7d10631c/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 88d5dc9..2d3a95b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -714,9 +714,9 @@ object Unidoc { publish := {}, unidocProjectFilter in(ScalaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010), unidocProjectFilter in(JavaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010), unidocAllClasspaths in (ScalaUnidoc, unidoc) := { ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18104][DOC] Don't build KafkaSource doc
Repository: spark Updated Branches: refs/heads/branch-2.0 5b81b0102 -> b482b3d58 [SPARK-18104][DOC] Don't build KafkaSource doc ## What changes were proposed in this pull request? Don't need to build doc for KafkaSource because the user should use the data source APIs to use KafkaSource. All KafkaSource APIs are internal. ## How was this patch tested? Verified manually. Author: Shixiong Zhu Closes #15630 from zsxwing/kafka-unidoc. (cherry picked from commit 7d10631c16b980adf1f55378c128436310daed65) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b482b3d5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b482b3d5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b482b3d5 Branch: refs/heads/branch-2.0 Commit: b482b3d586543744bdfe462351d945b9761b54c8 Parents: 5b81b01 Author: Shixiong Zhu Authored: Wed Oct 26 11:16:20 2016 -0700 Committer: Shixiong Zhu Committed: Wed Oct 26 11:16:27 2016 -0700 -- project/SparkBuild.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b482b3d5/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 7f7a65f..98f1e23 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -713,9 +713,9 @@ object Unidoc { publish := {}, unidocProjectFilter in(ScalaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010), unidocProjectFilter in(JavaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010), unidocAllClasspaths in (ScalaUnidoc, unidoc) := { ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18063][SQL] Failed to infer constraints over multiple aliases
Repository: spark Updated Branches: refs/heads/branch-2.0 773fbfef1 -> 5b81b0102 [SPARK-18063][SQL] Failed to infer constraints over multiple aliases ## What changes were proposed in this pull request? The `UnaryNode.getAliasedConstraints` function fails to replace all expressions by their alias where constraints contains more than one expression to be replaced. For example: ``` val tr = LocalRelation('a.int, 'b.string, 'c.int) val multiAlias = tr.where('a === 'c + 10).select('a.as('x), 'c.as('y)) multiAlias.analyze.constraints ``` currently outputs: ``` ExpressionSet(Seq( IsNotNull(resolveColumn(multiAlias.analyze, "x")), IsNotNull(resolveColumn(multiAlias.analyze, "y")) ) ``` The constraint `resolveColumn(multiAlias.analyze, "x") === resolveColumn(multiAlias.analyze, "y") + 10)` is missing. ## How was this patch tested? Add new test cases in `ConstraintPropagationSuite`. Author: jiangxingbo Closes #15597 from jiangxb1987/alias-constraints. (cherry picked from commit fa7d9d70825a6816495d239da925d0087f7cb94f) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b81b010 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b81b010 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b81b010 Branch: refs/heads/branch-2.0 Commit: 5b81b01026bc215c7982a640a794cd36ea720959 Parents: 773fbfe Author: jiangxingbo Authored: Wed Oct 26 20:12:20 2016 +0200 Committer: Reynold Xin Committed: Wed Oct 26 20:12:44 2016 +0200 -- .../sql/catalyst/plans/logical/LogicalPlan.scala| 16 ++-- .../catalyst/plans/ConstraintPropagationSuite.scala | 8 2 files changed, 18 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b81b010/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 6d77991..9c152fb88 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -293,15 +293,19 @@ abstract class UnaryNode extends LogicalPlan { * expressions with the corresponding alias */ protected def getAliasedConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { -projectList.flatMap { +var allConstraints = child.constraints.asInstanceOf[Set[Expression]] +projectList.foreach { case a @ Alias(e, _) => -child.constraints.map(_ transform { +// For every alias in `projectList`, replace the reference in constraints by its attribute. +allConstraints ++= allConstraints.map(_ transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute -}).union(Set(EqualNullSafe(e, a.toAttribute))) - case _ => -Set.empty[Expression] -}.toSet +}) +allConstraints += EqualNullSafe(e, a.toAttribute) + case _ => // Don't change. +} + +allConstraints -- child.constraints } override protected def validConstraints: Set[Expression] = child.constraints http://git-wip-us.apache.org/repos/asf/spark/blob/5b81b010/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 8d6a49a..8068ce9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -128,8 +128,16 @@ class ConstraintPropagationSuite extends SparkFunSuite { ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "x") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "x")), resolveColumn(aliasedRelation.analyze, "b") <=> resolveColumn(aliasedRelation.analyze, "y"), +resolveColumn(aliasedRelation.analyze, "z") <=> resolveColumn(aliasedRelation.analyze, "x"), resolveColumn(aliasedRelation.analyze, "z") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "z") + +val multiAlias = tr.where('a === 'c + 10).select('a.as('x), 'c.as('y)) +verifyConstraints(multiAlias.analyze.constraints, + ExpressionSet(Seq(IsNotNull(resolveColumn(multiAlias.a
spark git commit: [SPARK-18063][SQL] Failed to infer constraints over multiple aliases
Repository: spark Updated Branches: refs/heads/master 7ac70e7ba -> fa7d9d708 [SPARK-18063][SQL] Failed to infer constraints over multiple aliases ## What changes were proposed in this pull request? The `UnaryNode.getAliasedConstraints` function fails to replace all expressions by their alias where constraints contains more than one expression to be replaced. For example: ``` val tr = LocalRelation('a.int, 'b.string, 'c.int) val multiAlias = tr.where('a === 'c + 10).select('a.as('x), 'c.as('y)) multiAlias.analyze.constraints ``` currently outputs: ``` ExpressionSet(Seq( IsNotNull(resolveColumn(multiAlias.analyze, "x")), IsNotNull(resolveColumn(multiAlias.analyze, "y")) ) ``` The constraint `resolveColumn(multiAlias.analyze, "x") === resolveColumn(multiAlias.analyze, "y") + 10)` is missing. ## How was this patch tested? Add new test cases in `ConstraintPropagationSuite`. Author: jiangxingbo Closes #15597 from jiangxb1987/alias-constraints. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa7d9d70 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa7d9d70 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa7d9d70 Branch: refs/heads/master Commit: fa7d9d70825a6816495d239da925d0087f7cb94f Parents: 7ac70e7 Author: jiangxingbo Authored: Wed Oct 26 20:12:20 2016 +0200 Committer: Reynold Xin Committed: Wed Oct 26 20:12:20 2016 +0200 -- .../sql/catalyst/plans/logical/LogicalPlan.scala| 16 ++-- .../catalyst/plans/ConstraintPropagationSuite.scala | 8 2 files changed, 18 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fa7d9d70/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 0972547..b0a4145 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -293,15 +293,19 @@ abstract class UnaryNode extends LogicalPlan { * expressions with the corresponding alias */ protected def getAliasedConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { -projectList.flatMap { +var allConstraints = child.constraints.asInstanceOf[Set[Expression]] +projectList.foreach { case a @ Alias(e, _) => -child.constraints.map(_ transform { +// For every alias in `projectList`, replace the reference in constraints by its attribute. +allConstraints ++= allConstraints.map(_ transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute -}).union(Set(EqualNullSafe(e, a.toAttribute))) - case _ => -Set.empty[Expression] -}.toSet +}) +allConstraints += EqualNullSafe(e, a.toAttribute) + case _ => // Don't change. +} + +allConstraints -- child.constraints } override protected def validConstraints: Set[Expression] = child.constraints http://git-wip-us.apache.org/repos/asf/spark/blob/fa7d9d70/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 8d6a49a..8068ce9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -128,8 +128,16 @@ class ConstraintPropagationSuite extends SparkFunSuite { ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "x") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "x")), resolveColumn(aliasedRelation.analyze, "b") <=> resolveColumn(aliasedRelation.analyze, "y"), +resolveColumn(aliasedRelation.analyze, "z") <=> resolveColumn(aliasedRelation.analyze, "x"), resolveColumn(aliasedRelation.analyze, "z") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "z") + +val multiAlias = tr.where('a === 'c + 10).select('a.as('x), 'c.as('y)) +verifyConstraints(multiAlias.analyze.constraints, + ExpressionSet(Seq(IsNotNull(resolveColumn(multiAlias.analyze, "x")), +IsNotNull(resolveColumn(multiAlias.analyze, "y")), +resolveColumn(multiAlias
spark git commit: [SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL
Repository: spark Updated Branches: refs/heads/master 312ea3f7f -> 7ac70e7ba [SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL ## What changes were proposed in this pull request? Calling `Await.result` will allow other tasks to be run on the same thread when using ForkJoinPool. However, SQL uses a `ThreadLocal` execution id to trace Spark jobs launched by a query, which doesn't work perfectly in ForkJoinPool. This PR just uses `Awaitable.result` instead to prevent ForkJoinPool from running other tasks in the current waiting thread. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #15520 from zsxwing/SPARK-13747. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ac70e7b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ac70e7b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ac70e7b Branch: refs/heads/master Commit: 7ac70e7ba8d610a45c21a70dc28e4c989c19451b Parents: 312ea3f Author: Shixiong Zhu Authored: Wed Oct 26 10:36:36 2016 -0700 Committer: Shixiong Zhu Committed: Wed Oct 26 10:36:36 2016 -0700 -- .../org/apache/spark/util/ThreadUtils.scala | 21 scalastyle-config.xml | 1 + .../sql/execution/basicPhysicalOperators.scala | 2 +- .../exchange/BroadcastExchangeExec.scala| 3 ++- 4 files changed, 25 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 5a6dbc8..d093e7b 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -194,4 +194,25 @@ private[spark] object ThreadUtils { throw new SparkException("Exception thrown in awaitResult: ", t) } } + + /** + * Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s `BlockingContext`, wraps + * and re-throws any exceptions with nice stack track. + * + * Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent + * executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method + * basically prevents ForkJoinPool from running other tasks in the current waiting thread. + */ + @throws(classOf[SparkException]) + def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: Duration): T = { +try { + // `awaitPermission` is not actually used anywhere so it's safe to pass in null here. + // See SPARK-13747. + val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] + awaitable.result(Duration.Inf)(awaitPermission) +} catch { + case NonFatal(t) => +throw new SparkException("Exception thrown in awaitResult: ", t) +} + } } http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/scalastyle-config.xml -- diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 7fe0697..81d57d7 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -200,6 +200,7 @@ This file is divided into 3 sections: // scalastyle:off awaitresult Await.result(...) // scalastyle:on awaitresult + If your codes use ThreadLocal and may run in threads created by the user, use ThreadUtils.awaitResultInForkJoinSafely instead. ]]> http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 37d750e..a5291e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -570,7 +570,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { } override def executeCollect(): Array[InternalRow] = { -ThreadUtils.awaitResult(relationFuture, Duration.Inf) +ThreadUtils.awaitResultInForkJoinSafely(relationFuture, Duration.Inf) } } http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala -- diff --git a/sql/core/src/main/scala
spark git commit: [SPARK-16304] LinkageError should not crash Spark executor
Repository: spark Updated Branches: refs/heads/branch-2.0 b4a7b6551 -> 773fbfef1 [SPARK-16304] LinkageError should not crash Spark executor ## What changes were proposed in this pull request? This patch updates the failure handling logic so Spark executor does not crash when seeing LinkageError. ## How was this patch tested? Added an end-to-end test in FailureSuite. Author: petermaxlee Closes #13982 from petermaxlee/SPARK-16304. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/773fbfef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/773fbfef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/773fbfef Branch: refs/heads/branch-2.0 Commit: 773fbfef1929b64229fbf97a91c45cdb1ec1fb1f Parents: b4a7b65 Author: petermaxlee Authored: Wed Jul 6 10:46:22 2016 -0700 Committer: Shixiong Zhu Committed: Wed Oct 26 10:27:54 2016 -0700 -- core/src/main/scala/org/apache/spark/util/Utils.scala | 6 +- core/src/test/scala/org/apache/spark/FailureSuite.scala | 9 + 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/773fbfef/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1686edb..b9cf721 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1977,7 +1977,11 @@ private[spark] object Utils extends Logging { /** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */ def isFatalError(e: Throwable): Boolean = { e match { - case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable => + case NonFatal(_) | + _: InterruptedException | + _: NotImplementedError | + _: ControlThrowable | + _: LinkageError => false case _ => true http://git-wip-us.apache.org/repos/asf/spark/blob/773fbfef/core/src/test/scala/org/apache/spark/FailureSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index 132f636..d805c67 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -253,6 +253,15 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext { rdd.count() } + test("SPARK-16304: Link error should not crash executor") { +sc = new SparkContext("local[1,2]", "test") +intercept[SparkException] { + sc.parallelize(1 to 2).foreach { i => +throw new LinkageError() + } +} + } + // TODO: Need to add tests with shuffle fetch failures. } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17748][FOLLOW-UP][ML] Reorg variables of WeightedLeastSquares.
Repository: spark Updated Branches: refs/heads/master 4bee95407 -> 312ea3f7f [SPARK-17748][FOLLOW-UP][ML] Reorg variables of WeightedLeastSquares. ## What changes were proposed in this pull request? This is follow-up work of #15394. Reorg some variables of ```WeightedLeastSquares``` and fix one minor issue of ```WeightedLeastSquaresSuite```. ## How was this patch tested? Existing tests. Author: Yanbo Liang Closes #15621 from yanboliang/spark-17748. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/312ea3f7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/312ea3f7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/312ea3f7 Branch: refs/heads/master Commit: 312ea3f7f65532818e11016d6d780ad47485175f Parents: 4bee954 Author: Yanbo Liang Authored: Wed Oct 26 09:28:28 2016 -0700 Committer: Yanbo Liang Committed: Wed Oct 26 09:28:28 2016 -0700 -- .../spark/ml/optim/WeightedLeastSquares.scala | 139 +++ .../ml/optim/WeightedLeastSquaresSuite.scala| 15 +- 2 files changed, 86 insertions(+), 68 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/312ea3f7/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 2223f12..90c24e1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -101,23 +101,19 @@ private[ml] class WeightedLeastSquares( summary.validate() logInfo(s"Number of instances: ${summary.count}.") val k = if (fitIntercept) summary.k + 1 else summary.k +val numFeatures = summary.k val triK = summary.triK val wSum = summary.wSum -val bBar = summary.bBar -val bbBar = summary.bbBar -val aBar = summary.aBar -val aStd = summary.aStd -val abBar = summary.abBar -val aaBar = summary.aaBar -val numFeatures = abBar.size + val rawBStd = summary.bStd +val rawBBar = summary.bBar // if b is constant (rawBStd is zero), then b cannot be scaled. In this case -// setting bStd=abs(bBar) ensures that b is not scaled anymore in l-bfgs algorithm. -val bStd = if (rawBStd == 0.0) math.abs(bBar) else rawBStd +// setting bStd=abs(rawBBar) ensures that b is not scaled anymore in l-bfgs algorithm. +val bStd = if (rawBStd == 0.0) math.abs(rawBBar) else rawBStd if (rawBStd == 0) { - if (fitIntercept || bBar == 0.0) { -if (bBar == 0.0) { + if (fitIntercept || rawBBar == 0.0) { +if (rawBBar == 0.0) { logWarning(s"Mean and standard deviation of the label are zero, so the coefficients " + s"and the intercept will all be zero; as a result, training is not needed.") } else { @@ -126,7 +122,7 @@ private[ml] class WeightedLeastSquares( s"training is not needed.") } val coefficients = new DenseVector(Array.ofDim(numFeatures)) -val intercept = bBar +val intercept = rawBBar val diagInvAtWA = new DenseVector(Array(0D)) return new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA, Array(0D)) } else { @@ -137,53 +133,70 @@ private[ml] class WeightedLeastSquares( } } -// scale aBar to standardized space in-place -val aBarValues = aBar.values -var j = 0 -while (j < numFeatures) { - if (aStd(j) == 0.0) { -aBarValues(j) = 0.0 - } else { -aBarValues(j) /= aStd(j) - } - j += 1 -} +val bBar = summary.bBar / bStd +val bbBar = summary.bbBar / (bStd * bStd) -// scale abBar to standardized space in-place -val abBarValues = abBar.values +val aStd = summary.aStd val aStdValues = aStd.values -j = 0 -while (j < numFeatures) { - if (aStdValues(j) == 0.0) { -abBarValues(j) = 0.0 - } else { -abBarValues(j) /= (aStdValues(j) * bStd) + +val aBar = { + val _aBar = summary.aBar + val _aBarValues = _aBar.values + var i = 0 + // scale aBar to standardized space in-place + while (i < numFeatures) { +if (aStdValues(i) == 0.0) { + _aBarValues(i) = 0.0 +} else { + _aBarValues(i) /= aStdValues(i) +} +i += 1 } - j += 1 + _aBar } +val aBarValues = aBar.values -// scale aaBar to standardized space in-place -val aaBarValues = aaBar.values -j = 0 -var p = 0 -while (j < numFeatures) { - val aStdJ = aStdValues(j) +val abBar = { + val _abBar = sum
spark git commit: [SPARK-18093][SQL] Fix default value test in SQLConfSuite to work rega…
Repository: spark Updated Branches: refs/heads/branch-2.0 192c1dd4f -> b4a7b6551 [SPARK-18093][SQL] Fix default value test in SQLConfSuite to work rega⦠â¦rdless of warehouse dir's existence ## What changes were proposed in this pull request? Appending a trailing slash, if there already isn't one for the sake comparison of the two paths. It doesn't take away from the essence of the check, but removes any potential mismatch due to lack of trailing slash. ## How was this patch tested? Ran unit tests and they passed. Author: Mark Grover Closes #15623 from markgrover/spark-18093. (cherry picked from commit 4bee9540790a40acb74db4b0b44c364c4b3f537d) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4a7b655 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4a7b655 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4a7b655 Branch: refs/heads/branch-2.0 Commit: b4a7b6551d5db8f280656624b19bebf43fcfb819 Parents: 192c1dd Author: Mark Grover Authored: Wed Oct 26 09:07:30 2016 -0700 Committer: Marcelo Vanzin Committed: Wed Oct 26 09:07:42 2016 -0700 -- .../scala/org/apache/spark/sql/internal/SQLConfSuite.scala| 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b4a7b655/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index a230344..41e011b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -212,12 +212,15 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } test("default value of WAREHOUSE_PATH") { + val original = spark.conf.get(SQLConf.WAREHOUSE_PATH) try { // to get the default value, always unset it spark.conf.unset(SQLConf.WAREHOUSE_PATH.key) - assert(new Path(Utils.resolveURI("spark-warehouse")).toString === -spark.sessionState.conf.warehousePath + "/") + // JVM adds a trailing slash if the directory exists and leaves it as-is, if it doesn't + // In our comparison, strip trailing slash off of both sides, to account for such cases + assert(new Path(Utils.resolveURI("spark-warehouse")).toString.stripSuffix("/") === spark +.sessionState.conf.warehousePath.stripSuffix("/")) } finally { sql(s"set ${SQLConf.WAREHOUSE_PATH}=$original") } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18093][SQL] Fix default value test in SQLConfSuite to work rega…
Repository: spark Updated Branches: refs/heads/master 3c023570b -> 4bee95407 [SPARK-18093][SQL] Fix default value test in SQLConfSuite to work rega⦠â¦rdless of warehouse dir's existence ## What changes were proposed in this pull request? Appending a trailing slash, if there already isn't one for the sake comparison of the two paths. It doesn't take away from the essence of the check, but removes any potential mismatch due to lack of trailing slash. ## How was this patch tested? Ran unit tests and they passed. Author: Mark Grover Closes #15623 from markgrover/spark-18093. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4bee9540 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4bee9540 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4bee9540 Branch: refs/heads/master Commit: 4bee9540790a40acb74db4b0b44c364c4b3f537d Parents: 3c02357 Author: Mark Grover Authored: Wed Oct 26 09:07:30 2016 -0700 Committer: Marcelo Vanzin Committed: Wed Oct 26 09:07:30 2016 -0700 -- .../scala/org/apache/spark/sql/internal/SQLConfSuite.scala| 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4bee9540/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index a89a43f..11d4693 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -215,12 +215,15 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } test("default value of WAREHOUSE_PATH") { + val original = spark.conf.get(SQLConf.WAREHOUSE_PATH) try { // to get the default value, always unset it spark.conf.unset(SQLConf.WAREHOUSE_PATH.key) - assert(new Path(Utils.resolveURI("spark-warehouse")).toString === -spark.sessionState.conf.warehousePath + "/") + // JVM adds a trailing slash if the directory exists and leaves it as-is, if it doesn't + // In our comparison, strip trailing slash off of both sides, to account for such cases + assert(new Path(Utils.resolveURI("spark-warehouse")).toString.stripSuffix("/") === spark +.sessionState.conf.warehousePath.stripSuffix("/")) } finally { sql(s"set ${SQLConf.WAREHOUSE_PATH}=$original") } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r16706 - in /release/spark: spark-1.5.2/ spark-1.6.1/ spark-2.0.0-preview/ spark-2.0.0/
Author: srowen Date: Wed Oct 26 15:25:03 2016 New Revision: 16706 Log: Remove non-current Spark releases from dist Removed: release/spark/spark-1.5.2/ release/spark/spark-1.6.1/ release/spark/spark-2.0.0/ release/spark/spark-2.0.0-preview/ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17733][SQL] InferFiltersFromConstraints rule never terminates for query
Repository: spark Updated Branches: refs/heads/branch-2.0 c2cce2e60 -> 192c1dd4f [SPARK-17733][SQL] InferFiltersFromConstraints rule never terminates for query ## What changes were proposed in this pull request? The function `QueryPlan.inferAdditionalConstraints` and `UnaryNode.getAliasedConstraints` can produce a non-converging set of constraints for recursive functions. For instance, if we have two constraints of the form(where a is an alias): `a = b, a = f(b, c)` Applying both these rules in the next iteration would infer: `f(b, c) = f(f(b, c), c)` This process repeated, the iteration won't converge and the set of constraints will grow larger and larger until OOM. ~~To fix this problem, we collect alias from expressions and skip infer constraints if we are to transform an `Expression` to another which contains it.~~ To fix this problem, we apply additional check in `inferAdditionalConstraints`, when it's possible to generate recursive constraints, we skip generate that. ## How was this patch tested? Add new testcase in `SQLQuerySuite`/`InferFiltersFromConstraintsSuite`. Author: jiangxingbo Closes #15319 from jiangxb1987/constraints. (cherry picked from commit 3c023570b28bc1ed24f5b2448311130fd1777fd3) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/192c1dd4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/192c1dd4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/192c1dd4 Branch: refs/heads/branch-2.0 Commit: 192c1dd4fef4931b508b02d38a8c2405aa4785a0 Parents: c2cce2e Author: jiangxingbo Authored: Wed Oct 26 17:09:48 2016 +0200 Committer: Herman van Hovell Committed: Wed Oct 26 17:10:01 2016 +0200 -- .../spark/sql/catalyst/plans/QueryPlan.scala| 88 ++-- .../InferFiltersFromConstraintsSuite.scala | 87 ++- .../spark/sql/catalyst/plans/PlanTest.scala | 25 +- .../org/apache/spark/sql/SQLQuerySuite.scala| 5 +- 4 files changed, 191 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/192c1dd4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 9c60590..41c4e00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -68,26 +68,104 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT case _ => Seq.empty[Attribute] } + // Collect aliases from expressions, so we may avoid producing recursive constraints. + private lazy val aliasMap = AttributeMap( +(expressions ++ children.flatMap(_.expressions)).collect { + case a: Alias => (a.toAttribute, a.child) +}) + /** * Infers an additional set of constraints from a given set of equality constraints. * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an - * additional constraint of the form `b = 5` + * additional constraint of the form `b = 5`. + * + * [SPARK-17733] We explicitly prevent producing recursive constraints of the form `a = f(a, b)` + * as they are often useless and can lead to a non-converging set of constraints. */ private def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = { +val constraintClasses = generateEquivalentConstraintClasses(constraints) + var inferredConstraints = Set.empty[Expression] constraints.foreach { case eq @ EqualTo(l: Attribute, r: Attribute) => -inferredConstraints ++= (constraints - eq).map(_ transform { - case a: Attribute if a.semanticEquals(l) => r +val candidateConstraints = constraints - eq +inferredConstraints ++= candidateConstraints.map(_ transform { + case a: Attribute if a.semanticEquals(l) && +!isRecursiveDeduction(r, constraintClasses) => r }) -inferredConstraints ++= (constraints - eq).map(_ transform { - case a: Attribute if a.semanticEquals(r) => l +inferredConstraints ++= candidateConstraints.map(_ transform { + case a: Attribute if a.semanticEquals(r) && +!isRecursiveDeduction(l, constraintClasses) => l }) case _ => // No inference } inferredConstraints -- constraints } + /* + * Generate a sequence of expression sets from constraints, where each set stores an equivalence + * class of expressions. For example, Set(`a = b`, `b = c`, `e =
spark git commit: [SPARK-17733][SQL] InferFiltersFromConstraints rule never terminates for query
Repository: spark Updated Branches: refs/heads/master 402205ddf -> 3c023570b [SPARK-17733][SQL] InferFiltersFromConstraints rule never terminates for query ## What changes were proposed in this pull request? The function `QueryPlan.inferAdditionalConstraints` and `UnaryNode.getAliasedConstraints` can produce a non-converging set of constraints for recursive functions. For instance, if we have two constraints of the form(where a is an alias): `a = b, a = f(b, c)` Applying both these rules in the next iteration would infer: `f(b, c) = f(f(b, c), c)` This process repeated, the iteration won't converge and the set of constraints will grow larger and larger until OOM. ~~To fix this problem, we collect alias from expressions and skip infer constraints if we are to transform an `Expression` to another which contains it.~~ To fix this problem, we apply additional check in `inferAdditionalConstraints`, when it's possible to generate recursive constraints, we skip generate that. ## How was this patch tested? Add new testcase in `SQLQuerySuite`/`InferFiltersFromConstraintsSuite`. Author: jiangxingbo Closes #15319 from jiangxb1987/constraints. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c023570 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c023570 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c023570 Branch: refs/heads/master Commit: 3c023570b28bc1ed24f5b2448311130fd1777fd3 Parents: 402205d Author: jiangxingbo Authored: Wed Oct 26 17:09:48 2016 +0200 Committer: Herman van Hovell Committed: Wed Oct 26 17:09:48 2016 +0200 -- .../spark/sql/catalyst/plans/QueryPlan.scala| 88 ++-- .../InferFiltersFromConstraintsSuite.scala | 87 ++- .../spark/sql/catalyst/plans/PlanTest.scala | 25 +- .../org/apache/spark/sql/SQLQuerySuite.scala| 5 +- 4 files changed, 191 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c023570/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 0fb6e7d..45ee296 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -68,26 +68,104 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT case _ => Seq.empty[Attribute] } + // Collect aliases from expressions, so we may avoid producing recursive constraints. + private lazy val aliasMap = AttributeMap( +(expressions ++ children.flatMap(_.expressions)).collect { + case a: Alias => (a.toAttribute, a.child) +}) + /** * Infers an additional set of constraints from a given set of equality constraints. * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an - * additional constraint of the form `b = 5` + * additional constraint of the form `b = 5`. + * + * [SPARK-17733] We explicitly prevent producing recursive constraints of the form `a = f(a, b)` + * as they are often useless and can lead to a non-converging set of constraints. */ private def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = { +val constraintClasses = generateEquivalentConstraintClasses(constraints) + var inferredConstraints = Set.empty[Expression] constraints.foreach { case eq @ EqualTo(l: Attribute, r: Attribute) => -inferredConstraints ++= (constraints - eq).map(_ transform { - case a: Attribute if a.semanticEquals(l) => r +val candidateConstraints = constraints - eq +inferredConstraints ++= candidateConstraints.map(_ transform { + case a: Attribute if a.semanticEquals(l) && +!isRecursiveDeduction(r, constraintClasses) => r }) -inferredConstraints ++= (constraints - eq).map(_ transform { - case a: Attribute if a.semanticEquals(r) => l +inferredConstraints ++= candidateConstraints.map(_ transform { + case a: Attribute if a.semanticEquals(r) && +!isRecursiveDeduction(l, constraintClasses) => l }) case _ => // No inference } inferredConstraints -- constraints } + /* + * Generate a sequence of expression sets from constraints, where each set stores an equivalence + * class of expressions. For example, Set(`a = b`, `b = c`, `e = f`) will generate the following + * expression sets: (Set(a, b, c), Set(e, f)). This will be used to search
spark-website git commit: Link to archive.apache.org for Spark releases besides 2.0.1 and 1.6.2; comment/remove some dead JS in this file
Repository: spark-website Updated Branches: refs/heads/asf-site 4150a7329 -> f284a2687 Link to archive.apache.org for Spark releases besides 2.0.1 and 1.6.2; comment/remove some dead JS in this file Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/f284a268 Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/f284a268 Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/f284a268 Branch: refs/heads/asf-site Commit: f284a2687ad16348dab55bb92b7c334f4e1d0e15 Parents: 4150a73 Author: Sean Owen Authored: Tue Oct 25 14:42:42 2016 +0100 Committer: Sean Owen Committed: Tue Oct 25 14:42:42 2016 +0100 -- js/downloads.js | 82 --- site/js/downloads.js | 82 --- 2 files changed, 84 insertions(+), 80 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/f284a268/js/downloads.js -- diff --git a/js/downloads.js b/js/downloads.js index e04352f..9e31b50 100644 --- a/js/downloads.js +++ b/js/downloads.js @@ -3,8 +3,8 @@ releases = {}; -function addRelease(version, releaseDate, packages, downloadable, stable) { - releases[version] = {released: releaseDate, packages: packages, downloadable: downloadable, stable: stable}; +function addRelease(version, releaseDate, packages, stable) { + releases[version] = {released: releaseDate, packages: packages, stable: stable}; } var sources = {pretty: "Source Code", tag: "sources"}; @@ -16,38 +16,38 @@ var hadoop2p3 = {pretty: "Pre-built for Hadoop 2.3", tag: "hadoop2.3"}; var hadoop2p4 = {pretty: "Pre-built for Hadoop 2.4", tag: "hadoop2.4"}; var hadoop2p6 = {pretty: "Pre-built for Hadoop 2.6", tag: "hadoop2.6"}; var hadoop2p7 = {pretty: "Pre-built for Hadoop 2.7 and later", tag: "hadoop2.7"}; -var mapr3 = {pretty: "Pre-built for MapR 3.X", tag: "mapr3"}; -var mapr4 = {pretty: "Pre-built for MapR 4.X", tag: "mapr4"}; +//var mapr3 = {pretty: "Pre-built for MapR 3.X", tag: "mapr3"}; +//var mapr4 = {pretty: "Pre-built for MapR 4.X", tag: "mapr4"}; // 0.7+ -var packagesV1 = [hadoop1, cdh4, sources]; +//var packagesV1 = [hadoop1, cdh4, sources]; // 0.8.1+ -var packagesV2 = [hadoop2].concat(packagesV1); +//var packagesV2 = [hadoop2].concat(packagesV1); // 1.0.1+ -var packagesV3 = [mapr3, mapr4].concat(packagesV2); +//var packagesV3 = [mapr3, mapr4].concat(packagesV2); // 1.1.0+ -var packagesV4 = [hadoop2p4, hadoop2p3, mapr3, mapr4].concat(packagesV1); +//var packagesV4 = [hadoop2p4, hadoop2p3, mapr3, mapr4].concat(packagesV1); // 1.3.1+ -var packagesV5 = [hadoop2p6].concat(packagesV4); +//var packagesV5 = [hadoop2p6].concat(packagesV4); // 1.4.0+ -var packagesV6 = [hadoop2p6, hadoop2p4, hadoop2p3, hadoopFree].concat(packagesV1); +var packagesV6 = [hadoop2p6, hadoop2p4, hadoop2p3, hadoopFree, hadoop1, cdh4, sources]; // 2.0.0+ var packagesV7 = [hadoop2p7, hadoop2p6, hadoop2p4, hadoop2p3, hadoopFree, sources]; // addRelease("2.0.0-preview", new Date("05/24/2016"), sources.concat(packagesV7), true, false); -addRelease("2.0.1", new Date("10/03/2016"), packagesV7, true, true); -addRelease("2.0.0", new Date("07/26/2016"), packagesV7, true, true); -addRelease("1.6.2", new Date("06/25/2016"), packagesV6, true, true); -addRelease("1.6.1", new Date("03/09/2016"), packagesV6, true, true); -addRelease("1.6.0", new Date("01/04/2016"), packagesV6, true, true); -addRelease("1.5.2", new Date("11/09/2015"), packagesV6, true, true); -addRelease("1.5.1", new Date("10/02/2015"), packagesV6, true, true); -addRelease("1.5.0", new Date("9/09/2015"), packagesV6, true, true); -addRelease("1.4.1", new Date("7/15/2015"), packagesV6, true, true); -addRelease("1.4.0", new Date("6/11/2015"), packagesV6, true, true); -addRelease("1.3.1", new Date("4/17/2015"), packagesV5, true, true); -addRelease("1.3.0", new Date("3/13/2015"), packagesV4, true, true); +addRelease("2.0.1", new Date("10/03/2016"), packagesV7, true); +addRelease("2.0.0", new Date("07/26/2016"), packagesV7, true); +addRelease("1.6.2", new Date("06/25/2016"), packagesV6, true); +addRelease("1.6.1", new Date("03/09/2016"), packagesV6, true); +addRelease("1.6.0", new Date("01/04/2016"), packagesV6, true); +addRelease("1.5.2", new Date("11/09/2015"), packagesV6, true); +addRelease("1.5.1", new Date("10/02/2015"), packagesV6, true); +addRelease("1.5.0", new Date("9/09/2015"), packagesV6, true); +addRelease("1.4.1", new Date("7/15/2015"), packagesV6, true); +addRelease("1.4.0", new Date("6/11/2015"), packagesV6, true); +// addRelease("1.3.1", new Date("4/17/2015"), packagesV5, true, true); +// addRelease("1.3.0", new Date("3/13/2015"), packagesV4, true, true); // addRelease("1.2.2", new Date("4/17/20
spark git commit: [SPARK-17802] Improved caller context logging.
Repository: spark Updated Branches: refs/heads/master 5d0f81da4 -> 402205ddf [SPARK-17802] Improved caller context logging. ## What changes were proposed in this pull request? [SPARK-16757](https://issues.apache.org/jira/browse/SPARK-16757) sets the hadoop `CallerContext` when calling hadoop/hdfs apis to make spark applications more diagnosable in hadoop/hdfs logs. However, the `org.apache.hadoop.ipc.CallerContext` class is only added since [hadoop 2.8](https://issues.apache.org/jira/browse/HDFS-9184), which is not officially releaed yet. So each time `utils.CallerContext.setCurrentContext()` is called (e.g [when a task is created](https://github.com/apache/spark/blob/b678e46/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L95-L96)), a "java.lang.ClassNotFoundException: org.apache.hadoop.ipc.CallerContext" error is logged, which pollutes the spark logs when there are lots of tasks. This patch improves this behaviour by only logging the `ClassNotFoundException` once. ## How was this patch tested? Existing tests. Author: Shuai Lin Closes #15377 from lins05/spark-17802-improve-callercontext-logging. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/402205dd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/402205dd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/402205dd Branch: refs/heads/master Commit: 402205ddf749e7478683ce1b0443df63b46b03fd Parents: 5d0f81d Author: Shuai Lin Authored: Wed Oct 26 14:31:47 2016 +0200 Committer: Sean Owen Committed: Wed Oct 26 14:31:47 2016 +0200 -- .../scala/org/apache/spark/util/Utils.scala | 48 ++-- .../org/apache/spark/util/UtilsSuite.scala | 7 +-- 2 files changed, 36 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/402205dd/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index bfc6094..e57eb0d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2508,6 +2508,26 @@ private[spark] object Utils extends Logging { } } +private[util] object CallerContext extends Logging { + val callerContextSupported: Boolean = { +SparkHadoopUtil.get.conf.getBoolean("hadoop.caller.context.enabled", false) && { + try { +// scalastyle:off classforname +Class.forName("org.apache.hadoop.ipc.CallerContext") +Class.forName("org.apache.hadoop.ipc.CallerContext$Builder") +// scalastyle:on classforname +true + } catch { +case _: ClassNotFoundException => + false +case NonFatal(e) => + logWarning("Fail to load the CallerContext class", e) + false + } +} + } +} + /** * An utility class used to set up Spark caller contexts to HDFS and Yarn. The `context` will be * constructed by parameters passed in. @@ -2554,21 +2574,21 @@ private[spark] class CallerContext( * Set up the caller context [[context]] by invoking Hadoop CallerContext API of * [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8. */ - def setCurrentContext(): Boolean = { -var succeed = false -try { - // scalastyle:off classforname - val callerContext = Class.forName("org.apache.hadoop.ipc.CallerContext") - val Builder = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder") - // scalastyle:on classforname - val builderInst = Builder.getConstructor(classOf[String]).newInstance(context) - val hdfsContext = Builder.getMethod("build").invoke(builderInst) - callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext) - succeed = true -} catch { - case NonFatal(e) => logInfo("Fail to set Spark caller context", e) + def setCurrentContext(): Unit = { +if (CallerContext.callerContextSupported) { + try { +// scalastyle:off classforname +val callerContext = Class.forName("org.apache.hadoop.ipc.CallerContext") +val builder = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder") +// scalastyle:on classforname +val builderInst = builder.getConstructor(classOf[String]).newInstance(context) +val hdfsContext = builder.getMethod("build").invoke(builderInst) +callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext) + } catch { +case NonFatal(e) => + logWarning("Fail to set Spark caller context", e) + } } -succeed } } http://git-wip-us.apache.org/repos/asf/spark/blob/402205dd/core/src/test/scala/org/
spark git commit: [SPARK-4411][WEB UI] Add "kill" link for jobs in the UI
Repository: spark Updated Branches: refs/heads/master 297813647 -> 5d0f81da4 [SPARK-4411][WEB UI] Add "kill" link for jobs in the UI ## What changes were proposed in this pull request? Currently users can kill stages via the web ui but not jobs directly (jobs are killed if one of their stages is). I've added the ability to kill jobs via the web ui. This code change is based on #4823 by lianhuiwang and updated to work with the latest code matching how stages are currently killed. In general I've copied the kill stage code warning and note comments and all. I also updated applicable tests and documentation. ## How was this patch tested? Manually tested and dev/run-tests ![screen shot 2016-10-11 at 4 49 43 pm](https://cloud.githubusercontent.com/assets/13952758/19292857/12f1b7c0-8fd4-11e6-8982-210249f7b697.png) Author: Alex Bozarth Author: Lianhui Wang Closes #15441 from ajbozarth/spark4411. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d0f81da Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d0f81da Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d0f81da Branch: refs/heads/master Commit: 5d0f81da49e86ee93ecf679a20d024ea2cb8b3d3 Parents: 2978136 Author: Alex Bozarth Authored: Wed Oct 26 14:26:54 2016 +0200 Committer: Sean Owen Committed: Wed Oct 26 14:26:54 2016 +0200 -- .../scala/org/apache/spark/ui/SparkUI.scala | 11 ++--- .../org/apache/spark/ui/jobs/AllJobsPage.scala | 34 +++--- .../org/apache/spark/ui/jobs/JobsTab.scala | 17 +++ .../org/apache/spark/ui/jobs/StageTable.scala | 5 ++- .../org/apache/spark/ui/jobs/StagesTab.scala| 17 +++ .../org/apache/spark/ui/UISeleniumSuite.scala | 47 docs/configuration.md | 2 +- 7 files changed, 104 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/main/scala/org/apache/spark/ui/SparkUI.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index ef71db8..f631a04 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -58,14 +58,13 @@ private[spark] class SparkUI private ( val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) - - val stagesTab = new StagesTab(this) - var appId: String = _ /** Initialize all components of the server. */ def initialize() { -attachTab(new JobsTab(this)) +val jobsTab = new JobsTab(this) +attachTab(jobsTab) +val stagesTab = new StagesTab(this) attachTab(stagesTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) @@ -73,7 +72,9 @@ private[spark] class SparkUI private ( attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) attachHandler(ApiRootResource.getServletHandler(this)) -// This should be POST only, but, the YARN AM proxy won't proxy POSTs +// These should be POST only, but, the YARN AM proxy won't proxy POSTs +attachHandler(createRedirectHandler( + "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST"))) attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST"))) http://git-wip-us.apache.org/repos/asf/spark/blob/5d0f81da/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index f671309..173fc3c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -218,7 +218,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { request: HttpServletRequest, tableHeaderId: String, jobTag: String, - jobs: Seq[JobUIData]): Seq[Node] = { + jobs: Seq[JobUIData], + killEnabled: Boolean): Seq[Node] = { val allParameters = request.getParameterMap.asScala.toMap val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag)) .map(para => para._1 + "=" + para._2(0)) @@ -264,6 +265,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { parameterOtherTable, parent.jobProgresslistener.stageIdToInfo, parent.jobProgresslistener.stageIdToData, +killEnabled,
spark git commit: [SPARK-18027][YARN] .sparkStaging not clean on RM ApplicationNotFoundException
Repository: spark Updated Branches: refs/heads/master 6c7d094ec -> 297813647 [SPARK-18027][YARN] .sparkStaging not clean on RM ApplicationNotFoundException ## What changes were proposed in this pull request? Cleanup YARN staging dir on all `KILLED`/`FAILED` paths in `monitorApplication` ## How was this patch tested? Existing tests Author: Sean Owen Closes #15598 from srowen/SPARK-18027. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29781364 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29781364 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29781364 Branch: refs/heads/master Commit: 297813647508480d7b4b5bccd02b93b8b914301f Parents: 6c7d094 Author: Sean Owen Authored: Wed Oct 26 14:23:11 2016 +0200 Committer: Sean Owen Committed: Wed Oct 26 14:23:11 2016 +0200 -- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/29781364/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6e4f68c..55e4a83 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1059,9 +1059,11 @@ private[spark] class Client( } catch { case e: ApplicationNotFoundException => logError(s"Application $appId not found.") +cleanupStagingDir(appId) return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) case NonFatal(e) => logError(s"Failed to contact YARN for application $appId.", e) +// Don't necessarily clean up staging dir because status is unknown return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED) } val state = report.getYarnApplicationState - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18022][SQL] java.lang.NullPointerException instead of real exception when saving DF to MySQL
Repository: spark Updated Branches: refs/heads/branch-2.0 912487eaf -> c2cce2e60 [SPARK-18022][SQL] java.lang.NullPointerException instead of real exception when saving DF to MySQL ## What changes were proposed in this pull request? On null next exception in JDBC, don't init it as cause or suppressed ## How was this patch tested? Existing tests Author: Sean Owen Closes #15599 from srowen/SPARK-18022. (cherry picked from commit 6c7d094ec4d45a05c1ec8a418e507e45f5a88b7d) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2cce2e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2cce2e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2cce2e6 Branch: refs/heads/branch-2.0 Commit: c2cce2e60c491d5ff2b1eb1f30eb507f0d40dae1 Parents: 912487e Author: Sean Owen Authored: Wed Oct 26 14:19:40 2016 +0200 Committer: Sean Owen Committed: Wed Oct 26 14:19:59 2016 +0200 -- .../apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c2cce2e6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 7a8b825..2869e80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -236,7 +236,7 @@ object JdbcUtils extends Logging { } catch { case e: SQLException => val cause = e.getNextException -if (e.getCause != cause) { +if (cause != null && e.getCause != cause) { if (e.getCause == null) { e.initCause(cause) } else { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18022][SQL] java.lang.NullPointerException instead of real exception when saving DF to MySQL
Repository: spark Updated Branches: refs/heads/master 93b8ad184 -> 6c7d094ec [SPARK-18022][SQL] java.lang.NullPointerException instead of real exception when saving DF to MySQL ## What changes were proposed in this pull request? On null next exception in JDBC, don't init it as cause or suppressed ## How was this patch tested? Existing tests Author: Sean Owen Closes #15599 from srowen/SPARK-18022. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c7d094e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c7d094e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c7d094e Branch: refs/heads/master Commit: 6c7d094ec4d45a05c1ec8a418e507e45f5a88b7d Parents: 93b8ad1 Author: Sean Owen Authored: Wed Oct 26 14:19:40 2016 +0200 Committer: Sean Owen Committed: Wed Oct 26 14:19:40 2016 +0200 -- .../apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6c7d094e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index e32db73..41edb65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -607,7 +607,7 @@ object JdbcUtils extends Logging { } catch { case e: SQLException => val cause = e.getNextException -if (e.getCause != cause) { +if (cause != null && e.getCause != cause) { if (e.getCause == null) { e.initCause(cause) } else { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17693][SQL] Fixed Insert Failure To Data Source Tables when the Schema has the Comment Field
Repository: spark Updated Branches: refs/heads/master 12b3e8d2e -> 93b8ad184 [SPARK-17693][SQL] Fixed Insert Failure To Data Source Tables when the Schema has the Comment Field ### What changes were proposed in this pull request? ```SQL CREATE TABLE tab1(col1 int COMMENT 'a', col2 int) USING parquet INSERT INTO TABLE tab1 SELECT 1, 2 ``` The insert attempt will fail if the target table has a column with comments. The error is strange to the external users: ``` assertion failed: No plan for InsertIntoTable Relation[col1#15,col2#16] parquet, false, false +- Project [1 AS col1#19, 2 AS col2#20] +- OneRowRelation$ ``` This PR is to fix the above bug by checking the metadata when comparing the schema between the table and the query. If not matched, we also copy the metadata. This is an alternative to https://github.com/apache/spark/pull/15266 ### How was this patch tested? Added a test case Author: gatorsmile Closes #15615 from gatorsmile/insertDataSourceTableWithCommentSolution2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93b8ad18 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93b8ad18 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93b8ad18 Branch: refs/heads/master Commit: 93b8ad184aa3634f340d43a8bdf99836ef3d4f6c Parents: 12b3e8d Author: gatorsmile Authored: Wed Oct 26 00:38:34 2016 -0700 Committer: gatorsmile Committed: Wed Oct 26 00:38:34 2016 -0700 -- .../spark/sql/execution/datasources/rules.scala | 10 - .../apache/spark/sql/sources/InsertSuite.scala | 42 2 files changed, 50 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/93b8ad18/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index cf501cd..4647b11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -248,10 +248,16 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { expectedOutput: Seq[Attribute]): InsertIntoTable = { val newChildOutput = expectedOutput.zip(insert.child.output).map { case (expected, actual) => -if (expected.dataType.sameType(actual.dataType) && expected.name == actual.name) { +if (expected.dataType.sameType(actual.dataType) && +expected.name == actual.name && +expected.metadata == actual.metadata) { actual } else { - Alias(Cast(actual, expected.dataType), expected.name)() + // Renaming is needed for handling the following cases like + // 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2 + // 2) Target tables have column metadata + Alias(Cast(actual, expected.dataType), expected.name)( +explicitMetadata = Option(expected.metadata)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/93b8ad18/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 5eb5464..4a85b59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -185,6 +185,48 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { ) } + test("INSERT INTO TABLE with Comment in columns") { +val tabName = "tab1" +withTable(tabName) { + sql( +s""" + |CREATE TABLE $tabName(col1 int COMMENT 'a', col2 int) + |USING parquet + """.stripMargin) + sql(s"INSERT INTO TABLE $tabName SELECT 1, 2") + + checkAnswer( +sql(s"SELECT col1, col2 FROM $tabName"), +Row(1, 2) :: Nil + ) +} + } + + test("INSERT INTO TABLE - complex type but different names") { +val tab1 = "tab1" +val tab2 = "tab2" +withTable(tab1, tab2) { + sql( +s""" + |CREATE TABLE $tab1 (s struct) + |USING parquet + """.stripMargin) + sql(s"INSERT INTO TABLE $tab1 SELECT named_struct('col1','1','col2','2')") + + sql( +s""" + |CREATE TABLE $tab2 (p struct) + |USING parquet + """.stripMargin) + sql(s"INSERT INTO TABLE $tab2 SELECT * FROM $tab1") + +