[spark] branch branch-3.0 updated: [SPARK-30984][SS] Add UI test for Structured Streaming UI
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 9c583b8 [SPARK-30984][SS] Add UI test for Structured Streaming UI 9c583b8 is described below commit 9c583b8aff4f3d5799524619f4997281ae428da5 Author: Shixiong Zhu AuthorDate: Wed Mar 4 13:55:34 2020 +0800 [SPARK-30984][SS] Add UI test for Structured Streaming UI ### What changes were proposed in this pull request? - Add a UI test for Structured Streaming UI - Fix the unsafe usages of `SimpleDateFormat` by using a ThreadLocal shared object. - Use `start` to replace `submission` to be consistent with the API `StreamingQuery.start()`. ### Why are the changes needed? Structured Streaming UI is missing UI tests. ### Does this PR introduce any user-facing change? No ### How was this patch tested? The new test. Closes #27732 from zsxwing/ss-ui-test. Authored-by: Shixiong Zhu Signed-off-by: Wenchen Fan (cherry picked from commit ebfff7af6a9b2d672871317d30161cdafaa32ca4) Signed-off-by: Wenchen Fan --- sql/core/pom.xml | 10 ++ .../sql/execution/streaming/ProgressReporter.scala | 2 +- .../sql/execution/streaming/StreamExecution.scala | 4 +- .../sql/streaming/StreamingQueryListener.scala | 4 +- .../sql/streaming/ui/StreamingQueryPage.scala | 22 ++-- .../ui/StreamingQueryStatisticsPage.scala | 38 +++--- .../ui/StreamingQueryStatusListener.scala | 13 +- .../apache/spark/sql/streaming/ui/UIUtils.scala| 17 +++ .../streaming/StreamingQueryListenerSuite.scala| 6 +- .../sql/streaming/ui/StreamingQueryPageSuite.scala | 2 +- .../ui/StreamingQueryStatusListenerSuite.scala | 9 +- .../spark/sql/streaming/ui/UISeleniumSuite.scala | 145 + 12 files changed, 221 insertions(+), 51 deletions(-) diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 0e664ec..37da614 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -150,6 +150,16 @@ mockito-core test + + org.seleniumhq.selenium + selenium-java + test + + + org.seleniumhq.selenium + selenium-htmlunit-driver + test + target/scala-${scala.binary.version}/classes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index f20291e..feb151a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -349,7 +349,7 @@ trait ProgressReporter extends Logging { result } - private def formatTimestamp(millis: Long): String = { + protected def formatTimestamp(millis: Long): String = { timestampFormat.format(new Date(millis)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8b3534b..8006437 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -307,8 +307,8 @@ abstract class StreamExecution( } // `postEvent` does not throw non fatal exception. - val submissionTime = triggerClock.getTimeMillis() - postEvent(new QueryStartedEvent(id, runId, name, submissionTime)) + val startTimestamp = triggerClock.getTimeMillis() + postEvent(new QueryStartedEvent(id, runId, name, formatTimestamp(startTimestamp))) // Unblock starting thread startLatch.countDown() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index dd842cd..7ae38c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -82,7 +82,7 @@ object StreamingQueryListener { * @param id A unique query id that persists across restarts. See `StreamingQuery.id()`. * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. * @param name User-specified name of the query, null if not specified. - * @param submissionTime The timestamp to start a query. + * @param timestamp The timestamp to start a query. * @since 2.1.0 */ @Evolving @@ -90,7 +90,7 @@ object StreamingQueryListener { val id: UUID, val ru
[spark] branch master updated: [SPARK-30984][SS] Add UI test for Structured Streaming UI
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ebfff7a [SPARK-30984][SS] Add UI test for Structured Streaming UI ebfff7a is described below commit ebfff7af6a9b2d672871317d30161cdafaa32ca4 Author: Shixiong Zhu AuthorDate: Wed Mar 4 13:55:34 2020 +0800 [SPARK-30984][SS] Add UI test for Structured Streaming UI ### What changes were proposed in this pull request? - Add a UI test for Structured Streaming UI - Fix the unsafe usages of `SimpleDateFormat` by using a ThreadLocal shared object. - Use `start` to replace `submission` to be consistent with the API `StreamingQuery.start()`. ### Why are the changes needed? Structured Streaming UI is missing UI tests. ### Does this PR introduce any user-facing change? No ### How was this patch tested? The new test. Closes #27732 from zsxwing/ss-ui-test. Authored-by: Shixiong Zhu Signed-off-by: Wenchen Fan --- sql/core/pom.xml | 10 ++ .../sql/execution/streaming/ProgressReporter.scala | 2 +- .../sql/execution/streaming/StreamExecution.scala | 4 +- .../sql/streaming/StreamingQueryListener.scala | 4 +- .../sql/streaming/ui/StreamingQueryPage.scala | 22 ++-- .../ui/StreamingQueryStatisticsPage.scala | 38 +++--- .../ui/StreamingQueryStatusListener.scala | 13 +- .../apache/spark/sql/streaming/ui/UIUtils.scala| 17 +++ .../streaming/StreamingQueryListenerSuite.scala| 6 +- .../sql/streaming/ui/StreamingQueryPageSuite.scala | 2 +- .../ui/StreamingQueryStatusListenerSuite.scala | 9 +- .../spark/sql/streaming/ui/UISeleniumSuite.scala | 145 + 12 files changed, 221 insertions(+), 51 deletions(-) diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 6cdc63b..c95fe3c 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -150,6 +150,16 @@ mockito-core test + + org.seleniumhq.selenium + selenium-java + test + + + org.seleniumhq.selenium + selenium-htmlunit-driver + test + target/scala-${scala.binary.version}/classes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index f20291e..feb151a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -349,7 +349,7 @@ trait ProgressReporter extends Logging { result } - private def formatTimestamp(millis: Long): String = { + protected def formatTimestamp(millis: Long): String = { timestampFormat.format(new Date(millis)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8b3534b..8006437 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -307,8 +307,8 @@ abstract class StreamExecution( } // `postEvent` does not throw non fatal exception. - val submissionTime = triggerClock.getTimeMillis() - postEvent(new QueryStartedEvent(id, runId, name, submissionTime)) + val startTimestamp = triggerClock.getTimeMillis() + postEvent(new QueryStartedEvent(id, runId, name, formatTimestamp(startTimestamp))) // Unblock starting thread startLatch.countDown() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index dd842cd..7ae38c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -82,7 +82,7 @@ object StreamingQueryListener { * @param id A unique query id that persists across restarts. See `StreamingQuery.id()`. * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. * @param name User-specified name of the query, null if not specified. - * @param submissionTime The timestamp to start a query. + * @param timestamp The timestamp to start a query. * @since 2.1.0 */ @Evolving @@ -90,7 +90,7 @@ object StreamingQueryListener { val id: UUID, val runId: UUID, val name: String, - val submissionTime: Long) extends Event + val timestamp: String)
[spark] branch branch-3.0 updated: [SPARK-30289][FOLLOWUP][DOC] Update the migration guide for `spark.sql.legacy.ctePrecedencePolicy`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 4d489c8 [SPARK-30289][FOLLOWUP][DOC] Update the migration guide for `spark.sql.legacy.ctePrecedencePolicy` 4d489c8 is described below commit 4d489c85aa56816992d12908298e13bfd6fb4a0d Author: Yuanjian Li AuthorDate: Wed Mar 4 13:56:02 2020 +0900 [SPARK-30289][FOLLOWUP][DOC] Update the migration guide for `spark.sql.legacy.ctePrecedencePolicy` ### What changes were proposed in this pull request? Fix the migration guide document for `spark.sql.legacy.ctePrecedence.enabled`, which is introduced in #27579. ### Why are the changes needed? The config value changed. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Document only. Closes #27782 from xuanyuanking/SPARK-30829-follow. Authored-by: Yuanjian Li Signed-off-by: HyukjinKwon (cherry picked from commit f7f1948a8c7f7638f34d6a485abc9b866b87700f) Signed-off-by: HyukjinKwon --- docs/sql-migration-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index a9a39ce..0050061 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -101,7 +101,7 @@ license: | - Since Spark 3.0, if files or subdirectories disappear during recursive directory listing (i.e. they appear in an intermediate listing but then cannot be read or listed during later phases of the recursive directory listing, due to either concurrent file deletions or object store consistency issues) then the listing will fail with an exception unless `spark.sql.files.ignoreMissingFiles` is `true` (default `false`). In previous versions, these missing files or subdirectories would be i [...] - - Since Spark 3.0, Spark throws an AnalysisException if name conflict is detected in the nested WITH clause by default. It forces the users to choose the specific substitution order they wanted, which is controlled by `spark.sql.legacy.ctePrecedence.enabled`. If set to false (which is recommended), inner CTE definitions take precedence over outer definitions. For example, set the config to `false`, `WITH t AS (SELECT 1), t2 AS (WITH t AS (SELECT 2) SELECT * FROM t) SELECT * FROM t2` re [...] + - Since Spark 3.0, `spark.sql.legacy.ctePrecedencePolicy` is introduced to control the behavior for name conflicting in the nested WITH clause. By default value `EXCEPTION`, Spark throws an AnalysisException, it forces users to choose the specific substitution order they wanted. If set to `CORRECTED` (which is recommended), inner CTE definitions take precedence over outer definitions. For example, set the config to `false`, `WITH t AS (SELECT 1), t2 AS (WITH t AS (SELECT 2) SELECT * FR [...] - Since Spark 3.0, the `add_months` function does not adjust the resulting date to a last day of month if the original date is a last day of months. For example, `select add_months(DATE'2019-02-28', 1)` results `2019-03-28`. In Spark version 2.4 and earlier, the resulting date is adjusted when the original date is a last day of months. For example, adding a month to `2019-02-28` results in `2019-03-31`. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e1b3e9a -> f7f1948)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e1b3e9a [SPARK-29212][ML][PYSPARK] Add common classes without using JVM backend add f7f1948 [SPARK-30289][FOLLOWUP][DOC] Update the migration guide for `spark.sql.legacy.ctePrecedencePolicy` No new revisions were added by this update. Summary of changes: docs/sql-migration-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-29212][ML][PYSPARK] Add common classes without using JVM backend
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e1b3e9a [SPARK-29212][ML][PYSPARK] Add common classes without using JVM backend e1b3e9a is described below commit e1b3e9a3d25978dc0ad4609ecbc157ea1eebe2dd Author: zero323 AuthorDate: Wed Mar 4 12:20:02 2020 +0800 [SPARK-29212][ML][PYSPARK] Add common classes without using JVM backend ### What changes were proposed in this pull request? Implement common base ML classes (`Predictor`, `PredictionModel`, `Classifier`, `ClasssificationModel` `ProbabilisticClassifier`, `ProbabilisticClasssificationModel`, `Regressor`, `RegrssionModel`) for non-Java backends. Note - `Predictor` and `JavaClassifier` should be abstract as `_fit` method is not implemented. - `PredictionModel` should be abstract as `_transform` is not implemented. ### Why are the changes needed? To provide extensions points for non-JVM algorithms, as well as a public (as opposed to `Java*` variants, which are commonly described in docstrings as private) hierarchy which can be used to distinguish between different classes of predictors. For longer discussion see [SPARK-29212](https://issues.apache.org/jira/browse/SPARK-29212) and / or https://github.com/apache/spark/pull/25776. ### Does this PR introduce any user-facing change? It adds new base classes as listed above, but effective interfaces (method resolution order notwithstanding) stay the same. Additionally "private" `Java*` classes in`ml.regression` and `ml.classification` have been renamed to follow PEP-8 conventions (added leading underscore). It is for discussion if the same should be done to equivalent classes from `ml.wrapper`. If we take `JavaClassifier` as an example, type hierarchy will change from ![old pyspark ml classification JavaClassifier](https://user-images.githubusercontent.com/1554276/72657093-5c0b0c80-39a0-11ea-9069-a897d75de483.png) to ![new pyspark ml classification _JavaClassifier](https://user-images.githubusercontent.com/1554276/72657098-64fbde00-39a0-11ea-8f80-01187a5ea5a6.png) Similarly the old model ![old pyspark ml classification JavaClassificationModel](https://user-images.githubusercontent.com/1554276/72657103-7513bd80-39a0-11ea-9ffc-59eb6ab61fde.png) will become ![new pyspark ml classification _JavaClassificationModel](https://user-images.githubusercontent.com/1554276/72657110-80ff7f80-39a0-11ea-9f5c-fe408664e827.png) ### How was this patch tested? Existing unit tests. Closes #27245 from zero323/SPARK-29212. Authored-by: zero323 Signed-off-by: zhengruifeng --- python/pyspark/ml/__init__.py | 6 +- python/pyspark/ml/base.py | 81 - python/pyspark/ml/classification.py | 158 +- python/pyspark/ml/regression.py | 71 ++- python/pyspark/ml/tests/test_param.py | 6 +- python/pyspark/ml/wrapper.py | 52 ++- 6 files changed, 258 insertions(+), 116 deletions(-) diff --git a/python/pyspark/ml/__init__.py b/python/pyspark/ml/__init__.py index d99a253..47fc78e 100644 --- a/python/pyspark/ml/__init__.py +++ b/python/pyspark/ml/__init__.py @@ -19,13 +19,15 @@ DataFrame-based machine learning APIs to let users quickly assemble and configure practical machine learning pipelines. """ -from pyspark.ml.base import Estimator, Model, Transformer, UnaryTransformer +from pyspark.ml.base import Estimator, Model, Predictor, PredictionModel, \ +Transformer, UnaryTransformer from pyspark.ml.pipeline import Pipeline, PipelineModel from pyspark.ml import classification, clustering, evaluation, feature, fpm, \ image, pipeline, recommendation, regression, stat, tuning, util, linalg, param __all__ = [ -"Transformer", "UnaryTransformer", "Estimator", "Model", "Pipeline", "PipelineModel", +"Transformer", "UnaryTransformer", "Estimator", "Model", +"Predictor", "PredictionModel", "Pipeline", "PipelineModel", "classification", "clustering", "evaluation", "feature", "fpm", "image", "recommendation", "regression", "stat", "tuning", "util", "linalg", "param", ] diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py index 542cb25..b8df5a3 100644 --- a/python/pyspark/ml/base.py +++ b/python/pyspark/ml/base.py @@ -15,7 +15,7 @@ # limitations under the License. # -from abc import ABCMeta, abstractmethod +from abc import ABCMeta, abstractmethod, abstractproperty import copy import threading @@ -246,3 +246,82 @@ class UnaryTransformer(HasInputCol, HasOutputCol, Transformer): transformedDataset = dataset.withColumn(self.getOutputCol(),
[spark] branch master updated (380e887 -> 111e903)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 380e887 [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize add 111e903 [SPARK-30770][ML] avoid vector conversion in GMM.transform No new revisions were added by this update. Summary of changes: .../stat/distribution/MultivariateGaussian.scala | 34 +- .../distribution/MultivariateGaussianSuite.scala | 8 + .../spark/ml/clustering/GaussianMixture.scala | 29 +- python/pyspark/ml/clustering.py| 27 - 4 files changed, 50 insertions(+), 48 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (380e887 -> 111e903)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 380e887 [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize add 111e903 [SPARK-30770][ML] avoid vector conversion in GMM.transform No new revisions were added by this update. Summary of changes: .../stat/distribution/MultivariateGaussian.scala | 34 +- .../distribution/MultivariateGaussianSuite.scala | 8 + .../spark/ml/clustering/GaussianMixture.scala | 29 +- python/pyspark/ml/clustering.py| 27 - 4 files changed, 50 insertions(+), 48 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2732980 [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize 2732980 is described below commit 27329806c36d0b403153fe1ad0077acb72d92606 Author: yi.wu AuthorDate: Tue Mar 3 13:40:51 2020 -0800 [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize ### What changes were proposed in this pull request? This PR proposes to not cancel a `QueryStageExec` which failed before calling `doMaterialize`. Besides, this PR also includes 2 minor improvements: * fail fast when stage failed before calling `doMaterialize` * format Exception with Cause ### Why are the changes needed? For a stage which failed before materializing the lazy value (e.g. `inputRDD`), calling `cancel` on it could re-trigger the same failure again, e.g. executing child node again(see `AdaptiveQueryExecSuite`.`SPARK-30291: AQE should catch the exceptions when doing materialize` for example). And finally, the same failure will be counted 2 times, one is for materialize error and another is for cancel error. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Updated test. Closes #27752 from Ngone51/avoid_cancel_finished_stage. Authored-by: yi.wu Signed-off-by: gatorsmile (cherry picked from commit 380e8876316d6ef5a74358be2a04ab20e8b6e7ca) Signed-off-by: gatorsmile --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 23 +- .../adaptive/AdaptiveQueryExecSuite.scala | 3 ++- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 4036424..c018ca4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -165,7 +165,7 @@ case class AdaptiveSparkPlanExec( stagesToReplace = result.newStages ++ stagesToReplace executionId.foreach(onUpdatePlan) - // Start materialization of all new stages. + // Start materialization of all new stages and fail fast if any stages failed eagerly result.newStages.foreach { stage => try { stage.materialize().onComplete { res => @@ -176,7 +176,10 @@ case class AdaptiveSparkPlanExec( } }(AdaptiveSparkPlanExec.executionContext) } catch { - case e: Throwable => events.offer(StageFailure(stage, e)) + case e: Throwable => +val ex = new SparkException( + s"Early failed query stage found: ${stage.treeString}", e) +cleanUpAndThrowException(Seq(ex), Some(stage.id)) } } } @@ -192,13 +195,12 @@ case class AdaptiveSparkPlanExec( stage.resultOption = Some(res) case StageFailure(stage, ex) => errors.append( - new SparkException(s"Failed to materialize query stage: ${stage.treeString}." + -s" and the cause is ${ex.getMessage}", ex)) + new SparkException(s"Failed to materialize query stage: ${stage.treeString}.", ex)) } // In case of errors, we cancel all running stages and throw exception. if (errors.nonEmpty) { - cleanUpAndThrowException(errors) + cleanUpAndThrowException(errors, None) } // Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less @@ -522,9 +524,13 @@ case class AdaptiveSparkPlanExec( * Cancel all running stages with best effort and throw an Exception containing all stage * materialization errors and stage cancellation errors. */ - private def cleanUpAndThrowException(errors: Seq[SparkException]): Unit = { + private def cleanUpAndThrowException( + errors: Seq[SparkException], + earlyFailedStage: Option[Int]): Unit = { val runningStages = currentPhysicalPlan.collect { - case s: QueryStageExec => s + // earlyFailedStage is the stage which failed before calling doMaterialize, + // so we should avoid calling cancel on it to re-trigger the failure again. + case s: QueryStageExec if !earlyFailedStage.contains(s.id) => s } val cancelErrors = new mutable.ArrayBuffer[SparkException]() try { @@ -539,8 +545,7 @@ case class AdaptiveSparkPlanExec( } } finally { val ex =
[spark] branch master updated (4a1d273 -> 380e887)
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4a1d273 [SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions add 380e887 [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize No new revisions were added by this update. Summary of changes: .../execution/adaptive/AdaptiveSparkPlanExec.scala | 23 +- .../adaptive/AdaptiveQueryExecSuite.scala | 3 ++- 2 files changed, 16 insertions(+), 10 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7d853ab [SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions 7d853ab is described below commit 7d853ab6eba479a7cc5d8839b4fc497bc6b6d4c8 Author: Takeshi Yamamuro AuthorDate: Tue Mar 3 12:25:12 2020 -0800 [SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions ### What changes were proposed in this pull request? We have supported generators in SQL aggregate expressions by SPARK-28782. But, the generator(explode) query with aggregate functions in DataFrame failed as follows; ``` // SPARK-28782: Generator support in aggregate expressions scala> spark.range(3).toDF("id").createOrReplaceTempView("t") scala> sql("select explode(array(min(id), max(id))) from t").show() +---+ |col| +---+ | 0| | 2| +---+ // A failure case handled in this pr scala> spark.range(3).select(explode(array(min($"id"), max($"id".show() org.apache.spark.sql.AnalysisException: The query operator `Generate` contains one or more unsupported expression types Aggregate, Window or Generate. Invalid expressions: [min(`id`), max(`id`)];; Project [col#46L] +- Generate explode(array(min(id#42L), max(id#42L))), false, [col#46L] +- Range (0, 3, step=1, splits=Some(4)) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:49) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:48) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:129) ``` The root cause is that `ExtractGenerator` wrongly replaces a project w/ aggregate functions before `GlobalAggregates` replaces it with an aggregate as follows; ``` scala> sql("SET spark.sql.optimizer.planChangeLog.level=warn") scala> spark.range(3).select(explode(array(min($"id"), max($"id".show() 20/03/01 12:51:58 WARN HiveSessionStateBuilder$$anon$1: === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences === !'Project [explode(array(min('id), max('id))) AS List()] 'Project [explode(array(min(id#72L), max(id#72L))) AS List()] +- Range (0, 3, step=1, splits=Some(4)) +- Range (0, 3, step=1, splits=Some(4)) 20/03/01 12:51:58 WARN HiveSessionStateBuilder$$anon$1: === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator === !'Project [explode(array(min(id#72L), max(id#72L))) AS List()] Project [col#76L] !+- Range (0, 3, step=1, splits=Some(4)) +- Generate explode(array(min(id#72L), max(id#72L))), false, [col#76L] ! +- Range (0, 3, step=1, splits=Some(4)) 20/03/01 12:51:58 WARN HiveSessionStateBuilder$$anon$1: === Result of Batch Resolution === !'Project [explode(array(min('id), max('id))) AS List()] Project [col#76L] !+- Range (0, 3, step=1, splits=Some(4)) +- Generate explode(array(min(id#72L), max(id#72L))), false, [col#76L] ! +- Range (0, 3, step=1, splits=Some(4)) // the analysis failed here... ``` To avoid the case in `ExtractGenerator`, this pr addes a condition to ignore generators having aggregate functions. A correct sequence of rules is as follows; ``` 20/03/01 13:19:06 WARN HiveSessionStateBuilder$$anon$1: === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences === !'Project [explode(array(min('id), max('id))) AS List()] 'Project [explode(array(min(id#27L), max(id#27L))) AS List()] +- Range (0, 3, step=1, splits=Some(4)) +- Range (0, 3, step=1, splits=Some(4)) 20/03/01 13:19:06 WARN HiveSessionStateBuilder$$anon$1: === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates === !'Project [explode(array(min(id#27L), max(id#27L))) AS List()] 'Aggregate [explode(array(min(id#27L), max(id#27L))) AS List()] +- Range (0, 3, step=1, splits=Some(4)) +- Range (0, 3, step=1, splits=Some(4)) 20/03/01 13:19:06 WARN HiveSessionStateBuilder$$anon$1: === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator === !'Aggregate [explode(array(min(id#27L), max(id#27L))) AS List()] 'Project [explode(_gen_input_0#31) AS List()] !+- Range (0, 3, step=1, splits=Some(4)) +- Aggregate [array(min(id#27L), max(id#27L)) AS _gen_input_0#31] !
[spark] branch branch-3.0 updated: [SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7d853ab [SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions 7d853ab is described below commit 7d853ab6eba479a7cc5d8839b4fc497bc6b6d4c8 Author: Takeshi Yamamuro AuthorDate: Tue Mar 3 12:25:12 2020 -0800 [SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions ### What changes were proposed in this pull request? We have supported generators in SQL aggregate expressions by SPARK-28782. But, the generator(explode) query with aggregate functions in DataFrame failed as follows; ``` // SPARK-28782: Generator support in aggregate expressions scala> spark.range(3).toDF("id").createOrReplaceTempView("t") scala> sql("select explode(array(min(id), max(id))) from t").show() +---+ |col| +---+ | 0| | 2| +---+ // A failure case handled in this pr scala> spark.range(3).select(explode(array(min($"id"), max($"id".show() org.apache.spark.sql.AnalysisException: The query operator `Generate` contains one or more unsupported expression types Aggregate, Window or Generate. Invalid expressions: [min(`id`), max(`id`)];; Project [col#46L] +- Generate explode(array(min(id#42L), max(id#42L))), false, [col#46L] +- Range (0, 3, step=1, splits=Some(4)) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:49) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:48) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:129) ``` The root cause is that `ExtractGenerator` wrongly replaces a project w/ aggregate functions before `GlobalAggregates` replaces it with an aggregate as follows; ``` scala> sql("SET spark.sql.optimizer.planChangeLog.level=warn") scala> spark.range(3).select(explode(array(min($"id"), max($"id".show() 20/03/01 12:51:58 WARN HiveSessionStateBuilder$$anon$1: === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences === !'Project [explode(array(min('id), max('id))) AS List()] 'Project [explode(array(min(id#72L), max(id#72L))) AS List()] +- Range (0, 3, step=1, splits=Some(4)) +- Range (0, 3, step=1, splits=Some(4)) 20/03/01 12:51:58 WARN HiveSessionStateBuilder$$anon$1: === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator === !'Project [explode(array(min(id#72L), max(id#72L))) AS List()] Project [col#76L] !+- Range (0, 3, step=1, splits=Some(4)) +- Generate explode(array(min(id#72L), max(id#72L))), false, [col#76L] ! +- Range (0, 3, step=1, splits=Some(4)) 20/03/01 12:51:58 WARN HiveSessionStateBuilder$$anon$1: === Result of Batch Resolution === !'Project [explode(array(min('id), max('id))) AS List()] Project [col#76L] !+- Range (0, 3, step=1, splits=Some(4)) +- Generate explode(array(min(id#72L), max(id#72L))), false, [col#76L] ! +- Range (0, 3, step=1, splits=Some(4)) // the analysis failed here... ``` To avoid the case in `ExtractGenerator`, this pr addes a condition to ignore generators having aggregate functions. A correct sequence of rules is as follows; ``` 20/03/01 13:19:06 WARN HiveSessionStateBuilder$$anon$1: === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences === !'Project [explode(array(min('id), max('id))) AS List()] 'Project [explode(array(min(id#27L), max(id#27L))) AS List()] +- Range (0, 3, step=1, splits=Some(4)) +- Range (0, 3, step=1, splits=Some(4)) 20/03/01 13:19:06 WARN HiveSessionStateBuilder$$anon$1: === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates === !'Project [explode(array(min(id#27L), max(id#27L))) AS List()] 'Aggregate [explode(array(min(id#27L), max(id#27L))) AS List()] +- Range (0, 3, step=1, splits=Some(4)) +- Range (0, 3, step=1, splits=Some(4)) 20/03/01 13:19:06 WARN HiveSessionStateBuilder$$anon$1: === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator === !'Aggregate [explode(array(min(id#27L), max(id#27L))) AS List()] 'Project [explode(_gen_input_0#31) AS List()] !+- Range (0, 3, step=1, splits=Some(4)) +- Aggregate [array(min(id#27L), max(id#27L)) AS _gen_input_0#31] !
[spark] branch master updated (c263c15 -> 4a1d273)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c263c15 [SPARK-31015][SQL] Star(*) expression fails when used with qualified column names for v2 tables add 4a1d273 [SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 14 ++ .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 15 +++ .../org/apache/spark/sql/GeneratorFunctionSuite.scala | 5 + 3 files changed, 34 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (c263c15 -> 4a1d273)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c263c15 [SPARK-31015][SQL] Star(*) expression fails when used with qualified column names for v2 tables add 4a1d273 [SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 14 ++ .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 15 +++ .../org/apache/spark/sql/GeneratorFunctionSuite.scala | 5 + 3 files changed, 34 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31015][SQL] Star(*) expression fails when used with qualified column names for v2 tables
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 1c23be3 [SPARK-31015][SQL] Star(*) expression fails when used with qualified column names for v2 tables 1c23be3 is described below commit 1c23be3b8addaa5e15d29de788b906bcbdae953b Author: Terry Kim AuthorDate: Wed Mar 4 00:55:26 2020 +0800 [SPARK-31015][SQL] Star(*) expression fails when used with qualified column names for v2 tables ### What changes were proposed in this pull request? For a v2 table created with `CREATE TABLE testcat.ns1.ns2.tbl (id bigint, name string) USING foo`, the following works as expected ``` SELECT testcat.ns1.ns2.tbl.id FROM testcat.ns1.ns2.tbl ``` , but a query with qualified column name with star(*) ``` SELECT testcat.ns1.ns2.tbl.* FROM testcat.ns1.ns2.tbl [info] org.apache.spark.sql.AnalysisException: cannot resolve 'testcat.ns1.ns2.tbl.*' given input columns 'id, name'; ``` fails to resolve. And this PR proposes to fix this issue. ### Why are the changes needed? To fix a bug as describe above. ### Does this PR introduce any user-facing change? Yes, now `SELECT testcat.ns1.ns2.tbl.* FROM testcat.ns1.ns2.tbl` works as expected. ### How was this patch tested? Added new test. Closes #27766 from imback82/fix_star_expression. Authored-by: Terry Kim Signed-off-by: Wenchen Fan (cherry picked from commit c263c154080e54dd07aaa584913773314c3528e5) Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/analysis/unresolved.scala | 35 -- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 27 + 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 608f39c..6048d98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -298,35 +298,26 @@ abstract class Star extends LeafExpression with NamedExpression { case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable { /** - * Returns true if the nameParts match the qualifier of the attribute + * Returns true if the nameParts is a subset of the last elements of qualifier of the attribute. * - * There are two checks: i) Check if the nameParts match the qualifier fully. - * E.g. SELECT db.t1.* FROM db1.t1 In this case, the nameParts is Seq("db1", "t1") and - * qualifier of the attribute is Seq("db1","t1") - * ii) If (i) is not true, then check if nameParts is only a single element and it - * matches the table portion of the qualifier - * - * E.g. SELECT t1.* FROM db1.t1 In this case nameParts is Seq("t1") and - * qualifier is Seq("db1","t1") - * SELECT a.* FROM db1.t1 AS a - * In this case nameParts is Seq("a") and qualifier for - * attribute is Seq("a") + * For example, the following should all return true: + * - `SELECT ns1.ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns1", "ns2", "t") and + * qualifier is Seq("ns1", "ns2", "t"). + * - `SELECT ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns2", "t") and + * qualifier is Seq("ns1", "ns2", "t"). + * - `SELECT t.* FROM ns1.n2.t` where nameParts is Seq("t") and + * qualifier is Seq("ns1", "ns2", "t"). */ private def matchedQualifier( attribute: Attribute, nameParts: Seq[String], resolver: Resolver): Boolean = { -val qualifierList = attribute.qualifier - -val matched = nameParts.corresponds(qualifierList)(resolver) || { - // check if it matches the table portion of the qualifier - if (nameParts.length == 1 && qualifierList.nonEmpty) { -resolver(nameParts.head, qualifierList.last) - } else { -false - } +val qualifierList = if (nameParts.length == attribute.qualifier.length) { + attribute.qualifier +} else { + attribute.qualifier.takeRight(nameParts.length) } -matched +nameParts.corresponds(qualifierList)(resolver) } override def expand( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index c074b335..bccdce7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2342,6 +2342,33 @@ class DataSourceV2SQLSuite assert(e2.message.contains("It is not allowed to a
[spark] branch master updated (3ff2135 -> c263c15)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 3ff2135 [SPARK-30049][SQL] SQL fails to parse when comment contains an unmatched quote character add c263c15 [SPARK-31015][SQL] Star(*) expression fails when used with qualified column names for v2 tables No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/analysis/unresolved.scala | 35 -- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 27 + 2 files changed, 40 insertions(+), 22 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (3ff2135 -> c263c15)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 3ff2135 [SPARK-30049][SQL] SQL fails to parse when comment contains an unmatched quote character add c263c15 [SPARK-31015][SQL] Star(*) expression fails when used with qualified column names for v2 tables No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/analysis/unresolved.scala | 35 -- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 27 + 2 files changed, 40 insertions(+), 22 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-30049][SQL] SQL fails to parse when comment contains an unmatched quote character
This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3ff2135 [SPARK-30049][SQL] SQL fails to parse when comment contains an unmatched quote character 3ff2135 is described below commit 3ff213568694e265466d8480b61fd38f4fd8fdff Author: Javier AuthorDate: Tue Mar 3 09:55:15 2020 -0600 [SPARK-30049][SQL] SQL fails to parse when comment contains an unmatched quote character ### What changes were proposed in this pull request? A SQL statement that contains a comment with an unmatched quote character can lead to a parse error: - Added a insideComment flag in the splitter method to avoid checking single and double quotes within a comment: ``` spark-sql> SELECT 1 -- someone's comment here > ; Error in query: extraneous input ';' expecting (line 2, pos 0) == SQL == SELECT 1 -- someone's comment here ; ^^^ ``` ### Why are the changes needed? This misbehaviour was not present on previous spark versions. ### Does this PR introduce any user-facing change? - No ### How was this patch tested? - New tests were added. Closes #27321 from javierivanov/SPARK-30049B. Lead-authored-by: Javier Co-authored-by: Javier Fuentes Signed-off-by: Thomas Graves --- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 24 ++ .../spark/sql/hive/thriftserver/CliSuite.scala | 22 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index b665d4a..19f7ea8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -509,24 +509,40 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { private def splitSemiColon(line: String): JList[String] = { var insideSingleQuote = false var insideDoubleQuote = false +var insideComment = false var escape = false var beginIndex = 0 +var endIndex = line.length val ret = new JArrayList[String] + for (index <- 0 until line.length) { - if (line.charAt(index) == '\'') { + if (line.charAt(index) == '\'' && !insideComment) { // take a look to see if it is escaped if (!escape) { // flip the boolean variable insideSingleQuote = !insideSingleQuote } - } else if (line.charAt(index) == '\"') { + } else if (line.charAt(index) == '\"' && !insideComment) { // take a look to see if it is escaped if (!escape) { // flip the boolean variable insideDoubleQuote = !insideDoubleQuote } + } else if (line.charAt(index) == '-') { +val hasNext = index + 1 < line.length +if (insideDoubleQuote || insideSingleQuote || insideComment) { + // Ignores '-' in any case of quotes or comment. + // Avoids to start a comment(--) within a quoted segment or already in a comment. + // Sample query: select "quoted value --" + //^^ avoids starting a comment if it's inside quotes. +} else if (hasNext && line.charAt(index + 1) == '-') { + // ignore quotes and ; + insideComment = true + // ignore eol + endIndex = index +} } else if (line.charAt(index) == ';') { -if (insideSingleQuote || insideDoubleQuote) { +if (insideSingleQuote || insideDoubleQuote || insideComment) { // do not split } else { // split, do not include ; itself @@ -543,7 +559,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { escape = true } } -ret.add(line.substring(beginIndex)) +ret.add(line.substring(beginIndex, endIndex)) ret } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 6609701..43aafc3 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -400,4 +400,26 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { -> "1.00" ) } + + test("SPARK-30049 Should not complain for quotes in commented lines") { +run
[spark] branch branch-3.0 updated: [SPARK-30049][SQL] SQL fails to parse when comment contains an unmatched quote character
This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 4be2a79 [SPARK-30049][SQL] SQL fails to parse when comment contains an unmatched quote character 4be2a79 is described below commit 4be2a79c7a9a1b1e3b0c3704e94da19c2b87ba47 Author: Javier AuthorDate: Tue Mar 3 09:55:15 2020 -0600 [SPARK-30049][SQL] SQL fails to parse when comment contains an unmatched quote character ### What changes were proposed in this pull request? A SQL statement that contains a comment with an unmatched quote character can lead to a parse error: - Added a insideComment flag in the splitter method to avoid checking single and double quotes within a comment: ``` spark-sql> SELECT 1 -- someone's comment here > ; Error in query: extraneous input ';' expecting (line 2, pos 0) == SQL == SELECT 1 -- someone's comment here ; ^^^ ``` ### Why are the changes needed? This misbehaviour was not present on previous spark versions. ### Does this PR introduce any user-facing change? - No ### How was this patch tested? - New tests were added. Closes #27321 from javierivanov/SPARK-30049B. Lead-authored-by: Javier Co-authored-by: Javier Fuentes Signed-off-by: Thomas Graves (cherry picked from commit 3ff213568694e265466d8480b61fd38f4fd8fdff) Signed-off-by: Thomas Graves --- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 24 ++ .../spark/sql/hive/thriftserver/CliSuite.scala | 22 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index b665d4a..19f7ea8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -509,24 +509,40 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { private def splitSemiColon(line: String): JList[String] = { var insideSingleQuote = false var insideDoubleQuote = false +var insideComment = false var escape = false var beginIndex = 0 +var endIndex = line.length val ret = new JArrayList[String] + for (index <- 0 until line.length) { - if (line.charAt(index) == '\'') { + if (line.charAt(index) == '\'' && !insideComment) { // take a look to see if it is escaped if (!escape) { // flip the boolean variable insideSingleQuote = !insideSingleQuote } - } else if (line.charAt(index) == '\"') { + } else if (line.charAt(index) == '\"' && !insideComment) { // take a look to see if it is escaped if (!escape) { // flip the boolean variable insideDoubleQuote = !insideDoubleQuote } + } else if (line.charAt(index) == '-') { +val hasNext = index + 1 < line.length +if (insideDoubleQuote || insideSingleQuote || insideComment) { + // Ignores '-' in any case of quotes or comment. + // Avoids to start a comment(--) within a quoted segment or already in a comment. + // Sample query: select "quoted value --" + //^^ avoids starting a comment if it's inside quotes. +} else if (hasNext && line.charAt(index + 1) == '-') { + // ignore quotes and ; + insideComment = true + // ignore eol + endIndex = index +} } else if (line.charAt(index) == ';') { -if (insideSingleQuote || insideDoubleQuote) { +if (insideSingleQuote || insideDoubleQuote || insideComment) { // do not split } else { // split, do not include ; itself @@ -543,7 +559,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { escape = true } } -ret.add(line.substring(beginIndex)) +ret.add(line.substring(beginIndex, endIndex)) ret } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 6609701..43aafc3 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -400,4 +400,26 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { -> "1.00
[spark] branch branch-3.0 updated: [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks
This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0aace99 [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks 0aace99 is described below commit 0aace99d1162348269848665725c7db2541807cc Author: xuesenliang AuthorDate: Tue Mar 3 09:29:43 2020 -0600 [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks ### What changes were proposed in this pull request? When a job finished, its running (re-submitted) map stages should be marked as finished if not used by other jobs. The running tasks of these stages are cancelled. And the ListenerBus should be notified too, otherwise, these map stage items will stay on the "Active Stages" page of web UI and never gone. For example: Suppose job 0 has two stages: map stage 0 and result stage 1. Map stage 0 has two partitions, and its result stage 1 has two partitions too. **Steps to reproduce the bug:** 1. map stage 0:start task 0(```TID 0```) and task 1 (```TID 1```), then both finished successfully. 2. result stage 1: start task 0(```TID 2```) and task 1 (```TID 3```) 3. result stage 1: task 0(```TID 2```) finished successfully 4. result stage 1: speculative task 1.1(```TID 4```) launched, but then failed due to FetchFailedException. 5. driver re-submits map stage 0 and result stage 1. 6. map stage 0 (retry 1): task0(```TID 5```) launched 7. result stage 1: task 1(```TID 3```) finished successfully, so job 0 finished. 8. map stage 0 is removed from ```runningStages``` and ```stageIdToStage```, because it doesn't belong to any job. ``` private def DAGScheduler#cleanupStateForJobAndIndependentStages(job: ActiveJob): HashSet[Stage] = { ... stageIdToStage.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach { case (stageId, stage) => ... def removeStage(stageId: Int): Unit = { for (stage <- stageIdToStage.get(stageId)) { if (runningStages.contains(stage)) { logDebug("Removing running stage %d".format(stageId)) runningStages -= stage } ... } stageIdToStage -= stageId } jobSet -= job.jobId if (jobSet.isEmpty) { // no other job needs this stage removeStage(stageId) } } ... } ``` 9. map stage 0 (retry 1): task0(TID 5) finished successfully, but its stage 0 is not in ```stageIdToStage```, so the stage not ```markStageAsFinished``` ``` private[scheduler] def DAGScheduler#handleTaskCompletion(event: CompletionEvent): Unit = { val task = event.task val stageId = task.stageId ... if (!stageIdToStage.contains(task.stageId)) { postTaskEnd(event) // Skip all the actions if the stage has been cancelled. return } ... ``` Relevant spark driver logs as follows: ``` 20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 2 output partitions 20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) 20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) 20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents 20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1)) 20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 tasks 20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes) 20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes) 20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 32491 ms on 9.179.143.4 (executor 1) (1/2) 20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 40544 ms on 9.76.13.26 (executor 2) (2/2) 20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 40.854 s 20/01/02 11:22:26 INFO YarnClus
[spark] branch master updated: [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks
This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7a4cf33 [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks 7a4cf33 is described below commit 7a4cf339d7082b576624940253e8283de9e83e19 Author: xuesenliang AuthorDate: Tue Mar 3 09:29:43 2020 -0600 [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks ### What changes were proposed in this pull request? When a job finished, its running (re-submitted) map stages should be marked as finished if not used by other jobs. The running tasks of these stages are cancelled. And the ListenerBus should be notified too, otherwise, these map stage items will stay on the "Active Stages" page of web UI and never gone. For example: Suppose job 0 has two stages: map stage 0 and result stage 1. Map stage 0 has two partitions, and its result stage 1 has two partitions too. **Steps to reproduce the bug:** 1. map stage 0:start task 0(```TID 0```) and task 1 (```TID 1```), then both finished successfully. 2. result stage 1: start task 0(```TID 2```) and task 1 (```TID 3```) 3. result stage 1: task 0(```TID 2```) finished successfully 4. result stage 1: speculative task 1.1(```TID 4```) launched, but then failed due to FetchFailedException. 5. driver re-submits map stage 0 and result stage 1. 6. map stage 0 (retry 1): task0(```TID 5```) launched 7. result stage 1: task 1(```TID 3```) finished successfully, so job 0 finished. 8. map stage 0 is removed from ```runningStages``` and ```stageIdToStage```, because it doesn't belong to any job. ``` private def DAGScheduler#cleanupStateForJobAndIndependentStages(job: ActiveJob): HashSet[Stage] = { ... stageIdToStage.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach { case (stageId, stage) => ... def removeStage(stageId: Int): Unit = { for (stage <- stageIdToStage.get(stageId)) { if (runningStages.contains(stage)) { logDebug("Removing running stage %d".format(stageId)) runningStages -= stage } ... } stageIdToStage -= stageId } jobSet -= job.jobId if (jobSet.isEmpty) { // no other job needs this stage removeStage(stageId) } } ... } ``` 9. map stage 0 (retry 1): task0(TID 5) finished successfully, but its stage 0 is not in ```stageIdToStage```, so the stage not ```markStageAsFinished``` ``` private[scheduler] def DAGScheduler#handleTaskCompletion(event: CompletionEvent): Unit = { val task = event.task val stageId = task.stageId ... if (!stageIdToStage.contains(task.stageId)) { postTaskEnd(event) // Skip all the actions if the stage has been cancelled. return } ... ``` Relevant spark driver logs as follows: ``` 20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 2 output partitions 20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) 20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) 20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents 20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1)) 20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 tasks 20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes) 20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes) 20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 32491 ms on 9.179.143.4 (executor 1) (1/2) 20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 40544 ms on 9.76.13.26 (executor 2) (2/2) 20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 40.854 s 20/01/02 11:22:26 INFO YarnClusterSched
[spark] branch branch-3.0 updated: [SPARK-30994][CORE] Update xerces to 2.12.0
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 591bfd9 [SPARK-30994][CORE] Update xerces to 2.12.0 591bfd9 is described below commit 591bfd9466887fd41112945343722b573dc90a5b Author: Sean Owen AuthorDate: Tue Mar 3 09:27:18 2020 -0600 [SPARK-30994][CORE] Update xerces to 2.12.0 ### What changes were proposed in this pull request? Manage up the version of Xerces that Hadoop uses (and potentially user apps) to 2.12.0 to match https://issues.apache.org/jira/browse/HADOOP-16530 ### Why are the changes needed? Picks up bug and security fixes: https://www.xml.com/news/2018-05-apache-xerces-j-2120/ ### Does this PR introduce any user-facing change? Should be no behavior changes. ### How was this patch tested? Existing tests. Closes #27746 from srowen/SPARK-30994. Authored-by: Sean Owen Signed-off-by: Sean Owen (cherry picked from commit 97d9a22b04cc831b7de19db735154c69f696773c) Signed-off-by: Sean Owen --- dev/deps/spark-deps-hadoop-2.7-hive-1.2 | 2 +- dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 2 +- pom.xml | 6 ++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 b/dev/deps/spark-deps-hadoop-2.7-hive-1.2 index 113b7d7..364b825 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-1.2 @@ -201,7 +201,7 @@ super-csv/2.2.0//super-csv-2.2.0.jar threeten-extra/1.5.0//threeten-extra-1.5.0.jar univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar -xercesImpl/2.9.1//xercesImpl-2.9.1.jar +xercesImpl/2.12.0//xercesImpl-2.12.0.jar xmlenc/0.52//xmlenc-0.52.jar xz/1.5//xz-1.5.jar zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index 7b7423a..62d1436 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -215,7 +215,7 @@ transaction-api/1.1//transaction-api-1.1.jar univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar velocity/1.5//velocity-1.5.jar xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar -xercesImpl/2.9.1//xercesImpl-2.9.1.jar +xercesImpl/2.12.0//xercesImpl-2.12.0.jar xmlenc/0.52//xmlenc-0.52.jar xz/1.5//xz-1.5.jar zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar diff --git a/pom.xml b/pom.xml index 9d36faf..f98d132 100644 --- a/pom.xml +++ b/pom.xml @@ -1083,6 +1083,12 @@ + + +xerces +xercesImpl +2.12.0 + org.apache.avro avro - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a4aaee0 -> 97d9a22)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a4aaee0 [MINOR][DOCS] ForeachBatch java example fix add 97d9a22 [SPARK-30994][CORE] Update xerces to 2.12.0 No new revisions were added by this update. Summary of changes: dev/deps/spark-deps-hadoop-2.7-hive-1.2 | 2 +- dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 2 +- pom.xml | 6 ++ 3 files changed, 8 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks
This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2d22498 [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks 2d22498 is described below commit 2d22498d6a1f290f9ca54404a6e83ed4b61431d2 Author: xuesenliang AuthorDate: Tue Mar 3 09:27:07 2020 -0600 [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks ### What changes were proposed in this pull request? When a job finished, its running (re-submitted) map stages should be marked as finished if not used by other jobs. The running tasks of these stages are cancelled. And the ListenerBus should be notified too, otherwise, these map stage items will stay on the "Active Stages" page of web UI and never gone. For example: Suppose job 0 has two stages: map stage 0 and result stage 1. Map stage 0 has two partitions, and its result stage 1 has two partitions too. **Steps to reproduce the bug:** 1. map stage 0:start task 0(```TID 0```) and task 1 (```TID 1```), then both finished successfully. 2. result stage 1: start task 0(```TID 2```) and task 1 (```TID 3```) 3. result stage 1: task 0(```TID 2```) finished successfully 4. result stage 1: speculative task 1.1(```TID 4```) launched, but then failed due to FetchFailedException. 5. driver re-submits map stage 0 and result stage 1. 6. map stage 0 (retry 1): task0(```TID 5```) launched 7. result stage 1: task 1(```TID 3```) finished successfully, so job 0 finished. 8. map stage 0 is removed from ```runningStages``` and ```stageIdToStage```, because it doesn't belong to any job. ``` private def DAGScheduler#cleanupStateForJobAndIndependentStages(job: ActiveJob): HashSet[Stage] = { ... stageIdToStage.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach { case (stageId, stage) => ... def removeStage(stageId: Int): Unit = { for (stage <- stageIdToStage.get(stageId)) { if (runningStages.contains(stage)) { logDebug("Removing running stage %d".format(stageId)) runningStages -= stage } ... } stageIdToStage -= stageId } jobSet -= job.jobId if (jobSet.isEmpty) { // no other job needs this stage removeStage(stageId) } } ... } ``` 9. map stage 0 (retry 1): task0(TID 5) finished successfully, but its stage 0 is not in ```stageIdToStage```, so the stage not ```markStageAsFinished``` ``` private[scheduler] def DAGScheduler#handleTaskCompletion(event: CompletionEvent): Unit = { val task = event.task val stageId = task.stageId ... if (!stageIdToStage.contains(task.stageId)) { postTaskEnd(event) // Skip all the actions if the stage has been cancelled. return } ... ``` Relevant spark driver logs as follows: ``` 20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 2 output partitions 20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) 20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) 20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents 20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1)) 20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 tasks 20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes) 20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes) 20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 32491 ms on 9.179.143.4 (executor 1) (1/2) 20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 40544 ms on 9.76.13.26 (executor 2) (2/2) 20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 40.854 s 20/01/02 11:22:26 INFO YarnClusterSched
[spark] branch branch-2.4 updated: [MINOR][DOCS] ForeachBatch java example fix
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 0ea91da [MINOR][DOCS] ForeachBatch java example fix 0ea91da is described below commit 0ea91dab2ed2808edaf42398eda0614df26358d1 Author: roland-ondeviceresearch AuthorDate: Tue Mar 3 09:24:33 2020 -0600 [MINOR][DOCS] ForeachBatch java example fix ### What changes were proposed in this pull request? ForEachBatch Java example was incorrect ### Why are the changes needed? Example did not compile ### Does this PR introduce any user-facing change? Yes, to docs. ### How was this patch tested? In IDE. Closes #27740 from roland1982/foreachwriter_java_example_fix. Authored-by: roland-ondeviceresearch Signed-off-by: Sean Owen (cherry picked from commit a4aaee01fa8e71d51f49b24889d862422e0727c7) Signed-off-by: Sean Owen --- docs/structured-streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 515ba07..dce4b35 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2059,7 +2059,7 @@ streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => {% highlight java %} streamingDatasetOfString.writeStream().foreachBatch( - new VoidFunction2, Long> { + new VoidFunction2, Long>() { public void call(Dataset dataset, Long batchId) { // Transform and write batchDF } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [MINOR][DOCS] ForeachBatch java example fix
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new e332198 [MINOR][DOCS] ForeachBatch java example fix e332198 is described below commit e332198f5aab2eec30dedfab66296382d31d3245 Author: roland-ondeviceresearch AuthorDate: Tue Mar 3 09:24:33 2020 -0600 [MINOR][DOCS] ForeachBatch java example fix ### What changes were proposed in this pull request? ForEachBatch Java example was incorrect ### Why are the changes needed? Example did not compile ### Does this PR introduce any user-facing change? Yes, to docs. ### How was this patch tested? In IDE. Closes #27740 from roland1982/foreachwriter_java_example_fix. Authored-by: roland-ondeviceresearch Signed-off-by: Sean Owen (cherry picked from commit a4aaee01fa8e71d51f49b24889d862422e0727c7) Signed-off-by: Sean Owen --- docs/structured-streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 47d8994..1776d23 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2105,7 +2105,7 @@ streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => {% highlight java %} streamingDatasetOfString.writeStream().foreachBatch( - new VoidFunction2, Long> { + new VoidFunction2, Long>() { public void call(Dataset dataset, Long batchId) { // Transform and write batchDF } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [MINOR][DOCS] ForeachBatch java example fix
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 0ea91da [MINOR][DOCS] ForeachBatch java example fix 0ea91da is described below commit 0ea91dab2ed2808edaf42398eda0614df26358d1 Author: roland-ondeviceresearch AuthorDate: Tue Mar 3 09:24:33 2020 -0600 [MINOR][DOCS] ForeachBatch java example fix ### What changes were proposed in this pull request? ForEachBatch Java example was incorrect ### Why are the changes needed? Example did not compile ### Does this PR introduce any user-facing change? Yes, to docs. ### How was this patch tested? In IDE. Closes #27740 from roland1982/foreachwriter_java_example_fix. Authored-by: roland-ondeviceresearch Signed-off-by: Sean Owen (cherry picked from commit a4aaee01fa8e71d51f49b24889d862422e0727c7) Signed-off-by: Sean Owen --- docs/structured-streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 515ba07..dce4b35 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2059,7 +2059,7 @@ streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => {% highlight java %} streamingDatasetOfString.writeStream().foreachBatch( - new VoidFunction2, Long> { + new VoidFunction2, Long>() { public void call(Dataset dataset, Long batchId) { // Transform and write batchDF } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b5166aa -> a4aaee0)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b5166aa [SPARK-31013][CORE][WEBUI] InMemoryStore: improve removeAllByIndexValues over natural key index add a4aaee0 [MINOR][DOCS] ForeachBatch java example fix No new revisions were added by this update. Summary of changes: docs/structured-streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-30998][SQL][2.4] ClassCastException when a generator having nested inner generators
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new f4c8c48 [SPARK-30998][SQL][2.4] ClassCastException when a generator having nested inner generators f4c8c48 is described below commit f4c8c4892197b8c5425a8013a09e9b379444e6fc Author: Takeshi Yamamuro AuthorDate: Tue Mar 3 23:47:40 2020 +0900 [SPARK-30998][SQL][2.4] ClassCastException when a generator having nested inner generators ### What changes were proposed in this pull request? A query below failed in branch-2.4; ``` scala> sql("select array(array(1, 2), array(3)) ar").select(explode(explode($"ar"))).show() 20/03/01 13:51:56 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1] java.lang.ClassCastException: scala.collection.mutable.ArrayOps$ofRef cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData at org.apache.spark.sql.catalyst.expressions.ExplodeBase.eval(generators.scala:313) at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$8(GenerateExec.scala:108) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) ... ``` This pr modified the `hasNestedGenerator` code in `ExtractGenerator` for correctly catching nested inner generators. This backport PR comes from https://github.com/apache/spark/pull/27750# ### Why are the changes needed? A bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added tests. Closes #27769 from maropu/SPARK-20998-BRANCH-2.4. Authored-by: Takeshi Yamamuro Signed-off-by: Takeshi Yamamuro --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 16 +--- .../sql/catalyst/analysis/AnalysisErrorSuite.scala| 19 +++ .../org/apache/spark/sql/GeneratorFunctionSuite.scala | 8 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0fedf7f..61f77be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1681,10 +1681,20 @@ class Analyzer( } private def hasNestedGenerator(expr: NamedExpression): Boolean = { + def hasInnerGenerator(g: Generator): Boolean = g match { +// Since `GeneratorOuter` is just a wrapper of generators, we skip it here +case go: GeneratorOuter => + hasInnerGenerator(go.child) +case _ => + g.children.exists { _.find { +case _: Generator => true +case _ => false + }.isDefined } + } CleanupAliases.trimNonTopLevelAliases(expr) match { -case UnresolvedAlias(_: Generator, _) => false -case Alias(_: Generator, _) => false -case MultiAlias(_: Generator, _) => false +case UnresolvedAlias(g: Generator, _) => hasInnerGenerator(g) +case Alias(g: Generator, _) => hasInnerGenerator(g) +case MultiAlias(g: Generator, _) => hasInnerGenerator(g) case other => hasGenerator(other) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 45319aa..337902f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -395,6 +395,25 @@ class AnalysisErrorSuite extends AnalysisTest { ) errorTest( +"SPARK-30998: unsupported nested inner generators", +{ + val nestedListRelation = LocalRelation( +AttributeReference("nestedList", ArrayType(ArrayType(IntegerType)))()) + nestedListRelation.select(Explode(Explode($"nestedList"))) +}, +"Generators are not supported when it's nested in expressions, but got: " + + "explode(explode(nestedList))" :: Nil + ) + + errorTest( +"SPARK-30998: unsupported nested inner generators for aggregates", +testRelation.select(Explode(Explode( + CreateArray(CreateArray(min($"a") :: max($"a") :: Nil) :: Nil, +"Generators are not supported when it's nested in expressions, but got: " + + "explode(explode(array(array(min(a), max(a)" :
[spark] branch branch-2.4 updated: [SPARK-30998][SQL][2.4] ClassCastException when a generator having nested inner generators
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new f4c8c48 [SPARK-30998][SQL][2.4] ClassCastException when a generator having nested inner generators f4c8c48 is described below commit f4c8c4892197b8c5425a8013a09e9b379444e6fc Author: Takeshi Yamamuro AuthorDate: Tue Mar 3 23:47:40 2020 +0900 [SPARK-30998][SQL][2.4] ClassCastException when a generator having nested inner generators ### What changes were proposed in this pull request? A query below failed in branch-2.4; ``` scala> sql("select array(array(1, 2), array(3)) ar").select(explode(explode($"ar"))).show() 20/03/01 13:51:56 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1] java.lang.ClassCastException: scala.collection.mutable.ArrayOps$ofRef cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData at org.apache.spark.sql.catalyst.expressions.ExplodeBase.eval(generators.scala:313) at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$8(GenerateExec.scala:108) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) ... ``` This pr modified the `hasNestedGenerator` code in `ExtractGenerator` for correctly catching nested inner generators. This backport PR comes from https://github.com/apache/spark/pull/27750# ### Why are the changes needed? A bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added tests. Closes #27769 from maropu/SPARK-20998-BRANCH-2.4. Authored-by: Takeshi Yamamuro Signed-off-by: Takeshi Yamamuro --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 16 +--- .../sql/catalyst/analysis/AnalysisErrorSuite.scala| 19 +++ .../org/apache/spark/sql/GeneratorFunctionSuite.scala | 8 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0fedf7f..61f77be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1681,10 +1681,20 @@ class Analyzer( } private def hasNestedGenerator(expr: NamedExpression): Boolean = { + def hasInnerGenerator(g: Generator): Boolean = g match { +// Since `GeneratorOuter` is just a wrapper of generators, we skip it here +case go: GeneratorOuter => + hasInnerGenerator(go.child) +case _ => + g.children.exists { _.find { +case _: Generator => true +case _ => false + }.isDefined } + } CleanupAliases.trimNonTopLevelAliases(expr) match { -case UnresolvedAlias(_: Generator, _) => false -case Alias(_: Generator, _) => false -case MultiAlias(_: Generator, _) => false +case UnresolvedAlias(g: Generator, _) => hasInnerGenerator(g) +case Alias(g: Generator, _) => hasInnerGenerator(g) +case MultiAlias(g: Generator, _) => hasInnerGenerator(g) case other => hasGenerator(other) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 45319aa..337902f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -395,6 +395,25 @@ class AnalysisErrorSuite extends AnalysisTest { ) errorTest( +"SPARK-30998: unsupported nested inner generators", +{ + val nestedListRelation = LocalRelation( +AttributeReference("nestedList", ArrayType(ArrayType(IntegerType)))()) + nestedListRelation.select(Explode(Explode($"nestedList"))) +}, +"Generators are not supported when it's nested in expressions, but got: " + + "explode(explode(nestedList))" :: Nil + ) + + errorTest( +"SPARK-30998: unsupported nested inner generators for aggregates", +testRelation.select(Explode(Explode( + CreateArray(CreateArray(min($"a") :: max($"a") :: Nil) :: Nil, +"Generators are not supported when it's nested in expressions, but got: " + + "explode(explode(array(array(min(a), max(a)" :
[spark] branch master updated: [SPARK-31013][CORE][WEBUI] InMemoryStore: improve removeAllByIndexValues over natural key index
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b5166aa [SPARK-31013][CORE][WEBUI] InMemoryStore: improve removeAllByIndexValues over natural key index b5166aa is described below commit b5166aac1f2da7a2e90fc3a4932cfd411286843a Author: Gengliang Wang AuthorDate: Tue Mar 3 19:34:19 2020 +0800 [SPARK-31013][CORE][WEBUI] InMemoryStore: improve removeAllByIndexValues over natural key index ### What changes were proposed in this pull request? The method `removeAllByIndexValues` in KVStore is to delete all the objects which have certain values in the given index. However, in the current implementation of `InMemoryStore`, when the given index is the natural key index, there is no special handling for it and a linear search over all the task data is performed. We can improve it by deleting the natural keys directly from the internal hashmap. ### Why are the changes needed? Better performance if the given index for `removeAllByIndexValues` is the natural key index in `InMemoryStore` ### Does this PR introduce any user-facing change? No ### How was this patch tested? Enhance the existing test. Closes #27763 from gengliangwang/useNaturalIndex. Authored-by: Gengliang Wang Signed-off-by: Wenchen Fan --- .../org/apache/spark/util/kvstore/InMemoryStore.java | 16 +++- .../apache/spark/util/kvstore/InMemoryStoreSuite.java | 18 -- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java index db08740..f1bebb4 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java @@ -231,11 +231,16 @@ public class InMemoryStore implements KVStore { } int countingRemoveAllByIndexValues(String index, Collection indexValues) { - if (hasNaturalParentIndex && naturalParentIndexName.equals(index)) { + int count = 0; + if (KVIndex.NATURAL_INDEX_NAME.equals(index)) { +for (Object naturalKey : indexValues) { + count += delete(asKey(naturalKey)) ? 1 : 0; +} +return count; + } else if (hasNaturalParentIndex && naturalParentIndexName.equals(index)) { // If there is a parent index for the natural index and `index` happens to be it, // Spark can use the `parentToChildrenMap` to get the related natural keys, and then // delete them from `data`. -int count = 0; for (Object indexValue : indexValues) { Comparable parentKey = asKey(indexValue); NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, new NaturalKeys()); @@ -271,9 +276,9 @@ public class InMemoryStore implements KVStore { } } -public void delete(Object key) { - data.remove(asKey(key)); - if (hasNaturalParentIndex) { +public boolean delete(Object key) { + boolean entryExists = data.remove(asKey(key)) != null; + if (entryExists && hasNaturalParentIndex) { for (NaturalKeys v : parentToChildrenMap.values()) { if (v.remove(asKey(key))) { // `v` can be empty after removing the natural key and we can remove it from @@ -284,6 +289,7 @@ public class InMemoryStore implements KVStore { } } } + return entryExists; } public int size() { diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java index 9e34225..da52676 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java @@ -158,23 +158,29 @@ public class InMemoryStoreSuite { assertEquals(9, store.count(ArrayKeyIndexType.class)); +// Try removing non-existing keys +assert(!store.removeAllByIndexValues( + ArrayKeyIndexType.class, + KVIndex.NATURAL_INDEX_NAME, + ImmutableSet.of(new int[] {10, 10, 10}, new int[] { 3, 3, 3 }))); +assertEquals(9, store.count(ArrayKeyIndexType.class)); -store.removeAllByIndexValues( +assert(store.removeAllByIndexValues( ArrayKeyIndexType.class, KVIndex.NATURAL_INDEX_NAME, - ImmutableSet.of(new int[] {0, 0, 0}, new int[] { 2, 2, 2 })); + ImmutableSet.of(new int[] {0, 0, 0}, new int[] { 2, 2, 2 }))); assertEquals(7, store.count(ArrayKeyIndexType.class)); -store.removeAllByIndexValues( +assert(store.
[spark] branch branch-3.0 updated: [SPARK-30998][SQL] ClassCastException when a generator having nested inner generators
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new ded0a72 [SPARK-30998][SQL] ClassCastException when a generator having nested inner generators ded0a72 is described below commit ded0a72d81c1d34753be8a156126312506fb50b1 Author: Takeshi Yamamuro AuthorDate: Tue Mar 3 19:00:33 2020 +0900 [SPARK-30998][SQL] ClassCastException when a generator having nested inner generators ### What changes were proposed in this pull request? A query below failed in the master; ``` scala> sql("select array(array(1, 2), array(3)) ar").select(explode(explode($"ar"))).show() 20/03/01 13:51:56 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1] java.lang.ClassCastException: scala.collection.mutable.ArrayOps$ofRef cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData at org.apache.spark.sql.catalyst.expressions.ExplodeBase.eval(generators.scala:313) at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$8(GenerateExec.scala:108) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) ... ``` This pr modified the `hasNestedGenerator` code in `ExtractGenerator` for correctly catching nested inner generators. ### Why are the changes needed? A bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added tests. Closes #27750 from maropu/HandleNestedGenerators. Authored-by: Takeshi Yamamuro Signed-off-by: Takeshi Yamamuro (cherry picked from commit 313e62c376acab30e546df253b28452a664d3e73) Signed-off-by: Takeshi Yamamuro --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 16 +--- .../sql/catalyst/analysis/AnalysisErrorSuite.scala| 19 +++ .../org/apache/spark/sql/GeneratorFunctionSuite.scala | 8 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3d79799..486b952 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2164,10 +2164,20 @@ class Analyzer( } private def hasNestedGenerator(expr: NamedExpression): Boolean = { + def hasInnerGenerator(g: Generator): Boolean = g match { +// Since `GeneratorOuter` is just a wrapper of generators, we skip it here +case go: GeneratorOuter => + hasInnerGenerator(go.child) +case _ => + g.children.exists { _.find { +case _: Generator => true +case _ => false + }.isDefined } + } CleanupAliases.trimNonTopLevelAliases(expr) match { -case UnresolvedAlias(_: Generator, _) => false -case Alias(_: Generator, _) => false -case MultiAlias(_: Generator, _) => false +case UnresolvedAlias(g: Generator, _) => hasInnerGenerator(g) +case Alias(g: Generator, _) => hasInnerGenerator(g) +case MultiAlias(g: Generator, _) => hasInnerGenerator(g) case other => hasGenerator(other) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 8f62b0b..3db1053 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -434,6 +434,25 @@ class AnalysisErrorSuite extends AnalysisTest { ) errorTest( +"SPARK-30998: unsupported nested inner generators", +{ + val nestedListRelation = LocalRelation( +AttributeReference("nestedList", ArrayType(ArrayType(IntegerType)))()) + nestedListRelation.select(Explode(Explode($"nestedList"))) +}, +"Generators are not supported when it's nested in expressions, but got: " + + "explode(explode(nestedList))" :: Nil + ) + + errorTest( +"SPARK-30998: unsupported nested inner generators for aggregates", +testRelation.select(Explode(Explode( + CreateArray(CreateArray(min($"a") :: max($"a") :: Nil) :: Nil, +"Generators are not supported when it's nested in expressions, but got: " + + "explode(explode(array(arr
[spark] branch master updated (1fac06c -> 313e62c)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1fac06c Revert "[SPARK-30808][SQL] Enable Java 8 time API in Thrift server" add 313e62c [SPARK-30998][SQL] ClassCastException when a generator having nested inner generators No new revisions were added by this update. Summary of changes: .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 16 +--- .../sql/catalyst/analysis/AnalysisErrorSuite.scala| 19 +++ .../org/apache/spark/sql/GeneratorFunctionSuite.scala | 8 3 files changed, 40 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-30993][SQL][2.4] Use its sql type for UDT when checking the type of length (fixed/var) or mutable
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 7216099 [SPARK-30993][SQL][2.4] Use its sql type for UDT when checking the type of length (fixed/var) or mutable 7216099 is described below commit 72160991dc524456a136bca3b0c86359f66f37c4 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Tue Mar 3 17:50:43 2020 +0800 [SPARK-30993][SQL][2.4] Use its sql type for UDT when checking the type of length (fixed/var) or mutable ### What changes were proposed in this pull request? This patch fixes the bug of UnsafeRow which misses to handle the UDT specifically, in `isFixedLength` and `isMutable`. These methods don't check its SQL type for UDT, always treating UDT as variable-length, and non-mutable. It doesn't bring any issue if UDT is used to represent complicated type, but when UDT is used to represent some type which is matched with fixed length of SQL type, it exposes the chance of correctness issues, as these informations sometimes decide how the value should be handled. We got report from user mailing list which suspected as mapGroupsWithState looks like handling UDT incorrectly, but after some investigation it was from GenerateUnsafeRowJoiner in shuffle phase. https://github.com/apache/spark/blob/0e2ca11d80c3921387d7b077cb64c3a0c06b08d7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala#L32-L43 Here updating position should not happen on fixed-length column, but due to this bug, the value of UDT having fixed-length as sql type would be modified, which actually corrupts the value. ### Why are the changes needed? Misclassifying of the type of length for UDT can corrupt the value when the row is presented to the input of GenerateUnsafeRowJoiner, which brings correctness issue. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? New UT added. Closes #27761 from HeartSaVioR/SPARK-30993-branch-2.4. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/expressions/UnsafeRow.java | 8 + .../codegen/GenerateUnsafeRowJoinerSuite.scala | 41 - .../apache/spark/sql/UserDefinedTypeSuite.scala| 42 ++ 3 files changed, 90 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index ee2b67a..a2440d9 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -95,6 +95,10 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo } public static boolean isFixedLength(DataType dt) { +if (dt instanceof UserDefinedType) { + return isFixedLength(((UserDefinedType) dt).sqlType()); +} + if (dt instanceof DecimalType) { return ((DecimalType) dt).precision() <= Decimal.MAX_LONG_DIGITS(); } else { @@ -103,6 +107,10 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo } public static boolean isMutable(DataType dt) { +if (dt instanceof UserDefinedType) { + return isMutable(((UserDefinedType) dt).sqlType()); +} + return mutableFieldTypes.contains(dt) || dt instanceof DecimalType; } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala index 75c6bee..a5057d0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala @@ -17,13 +17,15 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import java.time.{LocalDateTime, ZoneOffset} + import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.sql.RandomDataGenerator import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -99,6 +101,23 @@ class GenerateUns