[spark] branch branch-3.0 updated: [SPARK-30984][SS] Add UI test for Structured Streaming UI

2020-03-03 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 9c583b8  [SPARK-30984][SS] Add UI test for Structured Streaming UI
9c583b8 is described below

commit 9c583b8aff4f3d5799524619f4997281ae428da5
Author: Shixiong Zhu 
AuthorDate: Wed Mar 4 13:55:34 2020 +0800

[SPARK-30984][SS] Add UI test for Structured Streaming UI

### What changes were proposed in this pull request?

- Add a UI test for Structured Streaming UI
- Fix the unsafe usages of `SimpleDateFormat` by using a ThreadLocal shared 
object.
- Use `start` to replace `submission` to be consistent with the API 
`StreamingQuery.start()`.

### Why are the changes needed?

Structured Streaming UI is missing UI tests.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

The new test.

Closes #27732 from zsxwing/ss-ui-test.

Authored-by: Shixiong Zhu 
Signed-off-by: Wenchen Fan 
(cherry picked from commit ebfff7af6a9b2d672871317d30161cdafaa32ca4)
Signed-off-by: Wenchen Fan 
---
 sql/core/pom.xml   |  10 ++
 .../sql/execution/streaming/ProgressReporter.scala |   2 +-
 .../sql/execution/streaming/StreamExecution.scala  |   4 +-
 .../sql/streaming/StreamingQueryListener.scala |   4 +-
 .../sql/streaming/ui/StreamingQueryPage.scala  |  22 ++--
 .../ui/StreamingQueryStatisticsPage.scala  |  38 +++---
 .../ui/StreamingQueryStatusListener.scala  |  13 +-
 .../apache/spark/sql/streaming/ui/UIUtils.scala|  17 +++
 .../streaming/StreamingQueryListenerSuite.scala|   6 +-
 .../sql/streaming/ui/StreamingQueryPageSuite.scala |   2 +-
 .../ui/StreamingQueryStatusListenerSuite.scala |   9 +-
 .../spark/sql/streaming/ui/UISeleniumSuite.scala   | 145 +
 12 files changed, 221 insertions(+), 51 deletions(-)

diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 0e664ec..37da614 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -150,6 +150,16 @@
   mockito-core
   test
 
+
+  org.seleniumhq.selenium
+  selenium-java
+  test
+
+
+  org.seleniumhq.selenium
+  selenium-htmlunit-driver
+  test
+
   
   
 
target/scala-${scala.binary.version}/classes
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index f20291e..feb151a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -349,7 +349,7 @@ trait ProgressReporter extends Logging {
 result
   }
 
-  private def formatTimestamp(millis: Long): String = {
+  protected def formatTimestamp(millis: Long): String = {
 timestampFormat.format(new Date(millis))
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8b3534b..8006437 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -307,8 +307,8 @@ abstract class StreamExecution(
   }
 
   // `postEvent` does not throw non fatal exception.
-  val submissionTime = triggerClock.getTimeMillis()
-  postEvent(new QueryStartedEvent(id, runId, name, submissionTime))
+  val startTimestamp = triggerClock.getTimeMillis()
+  postEvent(new QueryStartedEvent(id, runId, name, 
formatTimestamp(startTimestamp)))
 
   // Unblock starting thread
   startLatch.countDown()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index dd842cd..7ae38c7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -82,7 +82,7 @@ object StreamingQueryListener {
* @param id A unique query id that persists across restarts. See 
`StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See 
`StreamingQuery.runId()`.
* @param name User-specified name of the query, null if not specified.
-   * @param submissionTime The timestamp to start a query.
+   * @param timestamp The timestamp to start a query.
* @since 2.1.0
*/
   @Evolving
@@ -90,7 +90,7 @@ object StreamingQueryListener {
   val id: UUID,
   val ru

[spark] branch master updated: [SPARK-30984][SS] Add UI test for Structured Streaming UI

2020-03-03 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ebfff7a  [SPARK-30984][SS] Add UI test for Structured Streaming UI
ebfff7a is described below

commit ebfff7af6a9b2d672871317d30161cdafaa32ca4
Author: Shixiong Zhu 
AuthorDate: Wed Mar 4 13:55:34 2020 +0800

[SPARK-30984][SS] Add UI test for Structured Streaming UI

### What changes were proposed in this pull request?

- Add a UI test for Structured Streaming UI
- Fix the unsafe usages of `SimpleDateFormat` by using a ThreadLocal shared 
object.
- Use `start` to replace `submission` to be consistent with the API 
`StreamingQuery.start()`.

### Why are the changes needed?

Structured Streaming UI is missing UI tests.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

The new test.

Closes #27732 from zsxwing/ss-ui-test.

Authored-by: Shixiong Zhu 
Signed-off-by: Wenchen Fan 
---
 sql/core/pom.xml   |  10 ++
 .../sql/execution/streaming/ProgressReporter.scala |   2 +-
 .../sql/execution/streaming/StreamExecution.scala  |   4 +-
 .../sql/streaming/StreamingQueryListener.scala |   4 +-
 .../sql/streaming/ui/StreamingQueryPage.scala  |  22 ++--
 .../ui/StreamingQueryStatisticsPage.scala  |  38 +++---
 .../ui/StreamingQueryStatusListener.scala  |  13 +-
 .../apache/spark/sql/streaming/ui/UIUtils.scala|  17 +++
 .../streaming/StreamingQueryListenerSuite.scala|   6 +-
 .../sql/streaming/ui/StreamingQueryPageSuite.scala |   2 +-
 .../ui/StreamingQueryStatusListenerSuite.scala |   9 +-
 .../spark/sql/streaming/ui/UISeleniumSuite.scala   | 145 +
 12 files changed, 221 insertions(+), 51 deletions(-)

diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 6cdc63b..c95fe3c 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -150,6 +150,16 @@
   mockito-core
   test
 
+
+  org.seleniumhq.selenium
+  selenium-java
+  test
+
+
+  org.seleniumhq.selenium
+  selenium-htmlunit-driver
+  test
+
   
   
 
target/scala-${scala.binary.version}/classes
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index f20291e..feb151a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -349,7 +349,7 @@ trait ProgressReporter extends Logging {
 result
   }
 
-  private def formatTimestamp(millis: Long): String = {
+  protected def formatTimestamp(millis: Long): String = {
 timestampFormat.format(new Date(millis))
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8b3534b..8006437 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -307,8 +307,8 @@ abstract class StreamExecution(
   }
 
   // `postEvent` does not throw non fatal exception.
-  val submissionTime = triggerClock.getTimeMillis()
-  postEvent(new QueryStartedEvent(id, runId, name, submissionTime))
+  val startTimestamp = triggerClock.getTimeMillis()
+  postEvent(new QueryStartedEvent(id, runId, name, 
formatTimestamp(startTimestamp)))
 
   // Unblock starting thread
   startLatch.countDown()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index dd842cd..7ae38c7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -82,7 +82,7 @@ object StreamingQueryListener {
* @param id A unique query id that persists across restarts. See 
`StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See 
`StreamingQuery.runId()`.
* @param name User-specified name of the query, null if not specified.
-   * @param submissionTime The timestamp to start a query.
+   * @param timestamp The timestamp to start a query.
* @since 2.1.0
*/
   @Evolving
@@ -90,7 +90,7 @@ object StreamingQueryListener {
   val id: UUID,
   val runId: UUID,
   val name: String,
-  val submissionTime: Long) extends Event
+  val timestamp: String) 

[spark] branch branch-3.0 updated: [SPARK-30289][FOLLOWUP][DOC] Update the migration guide for `spark.sql.legacy.ctePrecedencePolicy`

2020-03-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 4d489c8  [SPARK-30289][FOLLOWUP][DOC] Update the migration guide for 
`spark.sql.legacy.ctePrecedencePolicy`
4d489c8 is described below

commit 4d489c85aa56816992d12908298e13bfd6fb4a0d
Author: Yuanjian Li 
AuthorDate: Wed Mar 4 13:56:02 2020 +0900

[SPARK-30289][FOLLOWUP][DOC] Update the migration guide for 
`spark.sql.legacy.ctePrecedencePolicy`

### What changes were proposed in this pull request?
Fix the migration guide document for 
`spark.sql.legacy.ctePrecedence.enabled`, which is introduced in #27579.

### Why are the changes needed?
The config value changed.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Document only.

Closes #27782 from xuanyuanking/SPARK-30829-follow.

Authored-by: Yuanjian Li 
Signed-off-by: HyukjinKwon 
(cherry picked from commit f7f1948a8c7f7638f34d6a485abc9b866b87700f)
Signed-off-by: HyukjinKwon 
---
 docs/sql-migration-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index a9a39ce..0050061 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -101,7 +101,7 @@ license: |
 
   - Since Spark 3.0, if files or subdirectories disappear during recursive 
directory listing (i.e. they appear in an intermediate listing but then cannot 
be read or listed during later phases of the recursive directory listing, due 
to either concurrent file deletions or object store consistency issues) then 
the listing will fail with an exception unless 
`spark.sql.files.ignoreMissingFiles` is `true` (default `false`). In previous 
versions, these missing files or subdirectories would be i [...]
 
-  - Since Spark 3.0, Spark throws an AnalysisException if name conflict is 
detected in the nested WITH clause by default. It forces the users to choose 
the specific substitution order they wanted, which is controlled by 
`spark.sql.legacy.ctePrecedence.enabled`. If set to false (which is 
recommended), inner CTE definitions take precedence over outer definitions. For 
example, set the config to `false`, `WITH t AS (SELECT 1), t2 AS (WITH t AS 
(SELECT 2) SELECT * FROM t) SELECT * FROM t2` re [...]
+  - Since Spark 3.0, `spark.sql.legacy.ctePrecedencePolicy` is introduced to 
control the behavior for name conflicting in the nested WITH clause. By default 
value `EXCEPTION`, Spark throws an AnalysisException, it forces users to choose 
the specific substitution order they wanted. If set to `CORRECTED` (which is 
recommended), inner CTE definitions take precedence over outer definitions. For 
example, set the config to `false`, `WITH t AS (SELECT 1), t2 AS (WITH t AS 
(SELECT 2) SELECT * FR [...]
 
   - Since Spark 3.0, the `add_months` function does not adjust the resulting 
date to a last day of month if the original date is a last day of months. For 
example, `select add_months(DATE'2019-02-28', 1)` results `2019-03-28`. In 
Spark version 2.4 and earlier, the resulting date is adjusted when the original 
date is a last day of months. For example, adding a month to `2019-02-28` 
results in `2019-03-31`.
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (e1b3e9a -> f7f1948)

2020-03-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from e1b3e9a  [SPARK-29212][ML][PYSPARK] Add common classes without using 
JVM backend
 add f7f1948  [SPARK-30289][FOLLOWUP][DOC] Update the migration guide for 
`spark.sql.legacy.ctePrecedencePolicy`

No new revisions were added by this update.

Summary of changes:
 docs/sql-migration-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-29212][ML][PYSPARK] Add common classes without using JVM backend

2020-03-03 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e1b3e9a  [SPARK-29212][ML][PYSPARK] Add common classes without using 
JVM backend
e1b3e9a is described below

commit e1b3e9a3d25978dc0ad4609ecbc157ea1eebe2dd
Author: zero323 
AuthorDate: Wed Mar 4 12:20:02 2020 +0800

[SPARK-29212][ML][PYSPARK] Add common classes without using JVM backend

### What changes were proposed in this pull request?

Implement common base ML classes (`Predictor`, `PredictionModel`, 
`Classifier`, `ClasssificationModel` `ProbabilisticClassifier`, 
`ProbabilisticClasssificationModel`, `Regressor`, `RegrssionModel`) for 
non-Java backends.

Note

- `Predictor` and `JavaClassifier` should be abstract as `_fit` method is 
not implemented.
- `PredictionModel` should be abstract as `_transform` is not implemented.

### Why are the changes needed?

To provide extensions points for non-JVM algorithms, as well as a public 
(as opposed to `Java*` variants, which are commonly described in docstrings as 
private) hierarchy which can be used to distinguish between different classes 
of predictors.

For longer discussion see 
[SPARK-29212](https://issues.apache.org/jira/browse/SPARK-29212) and / or 
https://github.com/apache/spark/pull/25776.

### Does this PR introduce any user-facing change?

It adds new base classes as listed above, but effective interfaces (method 
resolution order notwithstanding) stay the same.

Additionally "private" `Java*` classes in`ml.regression` and 
`ml.classification` have been renamed to follow PEP-8 conventions (added 
leading underscore).

It is for discussion if the same should be done to equivalent classes from 
`ml.wrapper`.

If we take `JavaClassifier` as an example, type hierarchy will change from

![old pyspark ml classification 
JavaClassifier](https://user-images.githubusercontent.com/1554276/72657093-5c0b0c80-39a0-11ea-9069-a897d75de483.png)

to

![new pyspark ml classification 
_JavaClassifier](https://user-images.githubusercontent.com/1554276/72657098-64fbde00-39a0-11ea-8f80-01187a5ea5a6.png)

Similarly the old model

![old pyspark ml classification 
JavaClassificationModel](https://user-images.githubusercontent.com/1554276/72657103-7513bd80-39a0-11ea-9ffc-59eb6ab61fde.png)

will become

![new pyspark ml classification 
_JavaClassificationModel](https://user-images.githubusercontent.com/1554276/72657110-80ff7f80-39a0-11ea-9f5c-fe408664e827.png)

### How was this patch tested?

Existing unit tests.

Closes #27245 from zero323/SPARK-29212.

Authored-by: zero323 
Signed-off-by: zhengruifeng 
---
 python/pyspark/ml/__init__.py |   6 +-
 python/pyspark/ml/base.py |  81 -
 python/pyspark/ml/classification.py   | 158 +-
 python/pyspark/ml/regression.py   |  71 ++-
 python/pyspark/ml/tests/test_param.py |   6 +-
 python/pyspark/ml/wrapper.py  |  52 ++-
 6 files changed, 258 insertions(+), 116 deletions(-)

diff --git a/python/pyspark/ml/__init__.py b/python/pyspark/ml/__init__.py
index d99a253..47fc78e 100644
--- a/python/pyspark/ml/__init__.py
+++ b/python/pyspark/ml/__init__.py
@@ -19,13 +19,15 @@
 DataFrame-based machine learning APIs to let users quickly assemble and 
configure practical
 machine learning pipelines.
 """
-from pyspark.ml.base import Estimator, Model, Transformer, UnaryTransformer
+from pyspark.ml.base import Estimator, Model, Predictor, PredictionModel, \
+Transformer, UnaryTransformer
 from pyspark.ml.pipeline import Pipeline, PipelineModel
 from pyspark.ml import classification, clustering, evaluation, feature, fpm, \
 image, pipeline, recommendation, regression, stat, tuning, util, linalg, 
param
 
 __all__ = [
-"Transformer", "UnaryTransformer", "Estimator", "Model", "Pipeline", 
"PipelineModel",
+"Transformer", "UnaryTransformer", "Estimator", "Model",
+"Predictor", "PredictionModel", "Pipeline", "PipelineModel",
 "classification", "clustering", "evaluation", "feature", "fpm", "image",
 "recommendation", "regression", "stat", "tuning", "util", "linalg", 
"param",
 ]
diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py
index 542cb25..b8df5a3 100644
--- a/python/pyspark/ml/base.py
+++ b/python/pyspark/ml/base.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from abc import ABCMeta, abstractmethod
+from abc import ABCMeta, abstractmethod, abstractproperty
 
 import copy
 import threading
@@ -246,3 +246,82 @@ class UnaryTransformer(HasInputCol, HasOutputCol, 
Transformer):
 transformedDataset = dataset.withColumn(self.getOutputCol(),
 

[spark] branch master updated (380e887 -> 111e903)

2020-03-03 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 380e887  [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed 
before call doMaterialize
 add 111e903  [SPARK-30770][ML] avoid vector conversion in GMM.transform

No new revisions were added by this update.

Summary of changes:
 .../stat/distribution/MultivariateGaussian.scala   | 34 +-
 .../distribution/MultivariateGaussianSuite.scala   |  8 +
 .../spark/ml/clustering/GaussianMixture.scala  | 29 +-
 python/pyspark/ml/clustering.py| 27 -
 4 files changed, 50 insertions(+), 48 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (380e887 -> 111e903)

2020-03-03 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 380e887  [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed 
before call doMaterialize
 add 111e903  [SPARK-30770][ML] avoid vector conversion in GMM.transform

No new revisions were added by this update.

Summary of changes:
 .../stat/distribution/MultivariateGaussian.scala   | 34 +-
 .../distribution/MultivariateGaussianSuite.scala   |  8 +
 .../spark/ml/clustering/GaussianMixture.scala  | 29 +-
 python/pyspark/ml/clustering.py| 27 -
 4 files changed, 50 insertions(+), 48 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize

2020-03-03 Thread lixiao
This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 2732980  [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed 
before call doMaterialize
2732980 is described below

commit 27329806c36d0b403153fe1ad0077acb72d92606
Author: yi.wu 
AuthorDate: Tue Mar 3 13:40:51 2020 -0800

[SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call 
doMaterialize

### What changes were proposed in this pull request?

This PR proposes to not cancel a `QueryStageExec` which failed before 
calling `doMaterialize`.

Besides, this PR also includes 2 minor improvements:

* fail fast when stage failed before calling `doMaterialize`

* format Exception with Cause

### Why are the changes needed?

For a stage which failed before materializing the lazy value (e.g. 
`inputRDD`), calling `cancel` on it could re-trigger the same failure again, 
e.g. executing child node again(see `AdaptiveQueryExecSuite`.`SPARK-30291: AQE 
should catch the exceptions when doing materialize` for example). And finally, 
the same failure will be counted 2 times, one is for materialize error and 
another is for cancel error.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Updated test.

Closes #27752 from Ngone51/avoid_cancel_finished_stage.

Authored-by: yi.wu 
Signed-off-by: gatorsmile 
(cherry picked from commit 380e8876316d6ef5a74358be2a04ab20e8b6e7ca)
Signed-off-by: gatorsmile 
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala | 23 +-
 .../adaptive/AdaptiveQueryExecSuite.scala  |  3 ++-
 2 files changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 4036424..c018ca4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -165,7 +165,7 @@ case class AdaptiveSparkPlanExec(
   stagesToReplace = result.newStages ++ stagesToReplace
   executionId.foreach(onUpdatePlan)
 
-  // Start materialization of all new stages.
+  // Start materialization of all new stages and fail fast if any 
stages failed eagerly
   result.newStages.foreach { stage =>
 try {
   stage.materialize().onComplete { res =>
@@ -176,7 +176,10 @@ case class AdaptiveSparkPlanExec(
 }
   }(AdaptiveSparkPlanExec.executionContext)
 } catch {
-  case e: Throwable => events.offer(StageFailure(stage, e))
+  case e: Throwable =>
+val ex = new SparkException(
+  s"Early failed query stage found: ${stage.treeString}", e)
+cleanUpAndThrowException(Seq(ex), Some(stage.id))
 }
   }
 }
@@ -192,13 +195,12 @@ case class AdaptiveSparkPlanExec(
 stage.resultOption = Some(res)
   case StageFailure(stage, ex) =>
 errors.append(
-  new SparkException(s"Failed to materialize query stage: 
${stage.treeString}." +
-s" and the cause is ${ex.getMessage}", ex))
+  new SparkException(s"Failed to materialize query stage: 
${stage.treeString}.", ex))
 }
 
 // In case of errors, we cancel all running stages and throw exception.
 if (errors.nonEmpty) {
-  cleanUpAndThrowException(errors)
+  cleanUpAndThrowException(errors, None)
 }
 
 // Try re-optimizing and re-planning. Adopt the new plan if its cost 
is equal to or less
@@ -522,9 +524,13 @@ case class AdaptiveSparkPlanExec(
* Cancel all running stages with best effort and throw an Exception 
containing all stage
* materialization errors and stage cancellation errors.
*/
-  private def cleanUpAndThrowException(errors: Seq[SparkException]): Unit = {
+  private def cleanUpAndThrowException(
+   errors: Seq[SparkException],
+   earlyFailedStage: Option[Int]): Unit = {
 val runningStages = currentPhysicalPlan.collect {
-  case s: QueryStageExec => s
+  // earlyFailedStage is the stage which failed before calling 
doMaterialize,
+  // so we should avoid calling cancel on it to re-trigger the failure 
again.
+  case s: QueryStageExec if !earlyFailedStage.contains(s.id) => s
 }
 val cancelErrors = new mutable.ArrayBuffer[SparkException]()
 try {
@@ -539,8 +545,7 @@ case class AdaptiveSparkPlanExec(
   }
 } finally {
   val ex = 

[spark] branch master updated (4a1d273 -> 380e887)

2020-03-03 Thread lixiao
This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4a1d273  [SPARK-30997][SQL] Fix an analysis failure in generators with 
aggregate functions
 add 380e887  [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed 
before call doMaterialize

No new revisions were added by this update.

Summary of changes:
 .../execution/adaptive/AdaptiveSparkPlanExec.scala | 23 +-
 .../adaptive/AdaptiveQueryExecSuite.scala  |  3 ++-
 2 files changed, 16 insertions(+), 10 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions

2020-03-03 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 7d853ab  [SPARK-30997][SQL] Fix an analysis failure in generators with 
aggregate functions
7d853ab is described below

commit 7d853ab6eba479a7cc5d8839b4fc497bc6b6d4c8
Author: Takeshi Yamamuro 
AuthorDate: Tue Mar 3 12:25:12 2020 -0800

[SPARK-30997][SQL] Fix an analysis failure in generators with aggregate 
functions

### What changes were proposed in this pull request?

We have supported generators in SQL aggregate expressions by SPARK-28782.
But, the generator(explode) query with aggregate functions in DataFrame 
failed as follows;

```
// SPARK-28782: Generator support in aggregate expressions
scala> spark.range(3).toDF("id").createOrReplaceTempView("t")
scala> sql("select explode(array(min(id), max(id))) from t").show()
+---+
|col|
+---+
|  0|
|  2|
+---+

// A failure case handled in this pr
scala> spark.range(3).select(explode(array(min($"id"), max($"id".show()
org.apache.spark.sql.AnalysisException:
The query operator `Generate` contains one or more unsupported
expression types Aggregate, Window or Generate.
Invalid expressions: [min(`id`), max(`id`)];;
Project [col#46L]
+- Generate explode(array(min(id#42L), max(id#42L))), false, [col#46L]
   +- Range (0, 3, step=1, splits=Some(4))

  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:49)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:48)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:129)
```

The root cause is that `ExtractGenerator` wrongly replaces a project w/ 
aggregate functions
before `GlobalAggregates` replaces it with an aggregate as follows;

```
scala> sql("SET spark.sql.optimizer.planChangeLog.level=warn")
scala> spark.range(3).select(explode(array(min($"id"), max($"id".show()

20/03/01 12:51:58 WARN HiveSessionStateBuilder$$anon$1:
=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences ===
!'Project [explode(array(min('id), max('id))) AS List()]   'Project 
[explode(array(min(id#72L), max(id#72L))) AS List()]
 +- Range (0, 3, step=1, splits=Some(4))   +- Range (0, 3, 
step=1, splits=Some(4))

20/03/01 12:51:58 WARN HiveSessionStateBuilder$$anon$1:
=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator ===
!'Project [explode(array(min(id#72L), max(id#72L))) AS List()]   Project 
[col#76L]
!+- Range (0, 3, step=1, splits=Some(4)) +- 
Generate explode(array(min(id#72L), max(id#72L))), false, [col#76L]
!   +- 
Range (0, 3, step=1, splits=Some(4))

20/03/01 12:51:58 WARN HiveSessionStateBuilder$$anon$1:
=== Result of Batch Resolution ===
!'Project [explode(array(min('id), max('id))) AS List()]   Project [col#76L]
!+- Range (0, 3, step=1, splits=Some(4))   +- Generate 
explode(array(min(id#72L), max(id#72L))), false, [col#76L]
! +- Range (0, 
3, step=1, splits=Some(4))

// the analysis failed here...
```

To avoid the case in `ExtractGenerator`, this pr addes a condition to 
ignore generators having aggregate functions.
A correct sequence of rules is as follows;

```
20/03/01 13:19:06 WARN HiveSessionStateBuilder$$anon$1:
=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences ===
!'Project [explode(array(min('id), max('id))) AS List()]   'Project 
[explode(array(min(id#27L), max(id#27L))) AS List()]
 +- Range (0, 3, step=1, splits=Some(4))   +- Range (0, 3, 
step=1, splits=Some(4))

20/03/01 13:19:06 WARN HiveSessionStateBuilder$$anon$1:
=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates ===
!'Project [explode(array(min(id#27L), max(id#27L))) AS List()]   'Aggregate 
[explode(array(min(id#27L), max(id#27L))) AS List()]
 +- Range (0, 3, step=1, splits=Some(4)) +- Range 
(0, 3, step=1, splits=Some(4))

20/03/01 13:19:06 WARN HiveSessionStateBuilder$$anon$1:
=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator ===
!'Aggregate [explode(array(min(id#27L), max(id#27L))) AS List()]   'Project 
[explode(_gen_input_0#31) AS List()]
!+- Range (0, 3, step=1, splits=Some(4))   +- 
Aggregate [array(min(id#27L), max(id#27L)) AS _gen_input_0#31]
!

[spark] branch branch-3.0 updated: [SPARK-30997][SQL] Fix an analysis failure in generators with aggregate functions

2020-03-03 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 7d853ab  [SPARK-30997][SQL] Fix an analysis failure in generators with 
aggregate functions
7d853ab is described below

commit 7d853ab6eba479a7cc5d8839b4fc497bc6b6d4c8
Author: Takeshi Yamamuro 
AuthorDate: Tue Mar 3 12:25:12 2020 -0800

[SPARK-30997][SQL] Fix an analysis failure in generators with aggregate 
functions

### What changes were proposed in this pull request?

We have supported generators in SQL aggregate expressions by SPARK-28782.
But, the generator(explode) query with aggregate functions in DataFrame 
failed as follows;

```
// SPARK-28782: Generator support in aggregate expressions
scala> spark.range(3).toDF("id").createOrReplaceTempView("t")
scala> sql("select explode(array(min(id), max(id))) from t").show()
+---+
|col|
+---+
|  0|
|  2|
+---+

// A failure case handled in this pr
scala> spark.range(3).select(explode(array(min($"id"), max($"id".show()
org.apache.spark.sql.AnalysisException:
The query operator `Generate` contains one or more unsupported
expression types Aggregate, Window or Generate.
Invalid expressions: [min(`id`), max(`id`)];;
Project [col#46L]
+- Generate explode(array(min(id#42L), max(id#42L))), false, [col#46L]
   +- Range (0, 3, step=1, splits=Some(4))

  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:49)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:48)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:129)
```

The root cause is that `ExtractGenerator` wrongly replaces a project w/ 
aggregate functions
before `GlobalAggregates` replaces it with an aggregate as follows;

```
scala> sql("SET spark.sql.optimizer.planChangeLog.level=warn")
scala> spark.range(3).select(explode(array(min($"id"), max($"id".show()

20/03/01 12:51:58 WARN HiveSessionStateBuilder$$anon$1:
=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences ===
!'Project [explode(array(min('id), max('id))) AS List()]   'Project 
[explode(array(min(id#72L), max(id#72L))) AS List()]
 +- Range (0, 3, step=1, splits=Some(4))   +- Range (0, 3, 
step=1, splits=Some(4))

20/03/01 12:51:58 WARN HiveSessionStateBuilder$$anon$1:
=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator ===
!'Project [explode(array(min(id#72L), max(id#72L))) AS List()]   Project 
[col#76L]
!+- Range (0, 3, step=1, splits=Some(4)) +- 
Generate explode(array(min(id#72L), max(id#72L))), false, [col#76L]
!   +- 
Range (0, 3, step=1, splits=Some(4))

20/03/01 12:51:58 WARN HiveSessionStateBuilder$$anon$1:
=== Result of Batch Resolution ===
!'Project [explode(array(min('id), max('id))) AS List()]   Project [col#76L]
!+- Range (0, 3, step=1, splits=Some(4))   +- Generate 
explode(array(min(id#72L), max(id#72L))), false, [col#76L]
! +- Range (0, 
3, step=1, splits=Some(4))

// the analysis failed here...
```

To avoid the case in `ExtractGenerator`, this pr addes a condition to 
ignore generators having aggregate functions.
A correct sequence of rules is as follows;

```
20/03/01 13:19:06 WARN HiveSessionStateBuilder$$anon$1:
=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences ===
!'Project [explode(array(min('id), max('id))) AS List()]   'Project 
[explode(array(min(id#27L), max(id#27L))) AS List()]
 +- Range (0, 3, step=1, splits=Some(4))   +- Range (0, 3, 
step=1, splits=Some(4))

20/03/01 13:19:06 WARN HiveSessionStateBuilder$$anon$1:
=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates ===
!'Project [explode(array(min(id#27L), max(id#27L))) AS List()]   'Aggregate 
[explode(array(min(id#27L), max(id#27L))) AS List()]
 +- Range (0, 3, step=1, splits=Some(4)) +- Range 
(0, 3, step=1, splits=Some(4))

20/03/01 13:19:06 WARN HiveSessionStateBuilder$$anon$1:
=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator ===
!'Aggregate [explode(array(min(id#27L), max(id#27L))) AS List()]   'Project 
[explode(_gen_input_0#31) AS List()]
!+- Range (0, 3, step=1, splits=Some(4))   +- 
Aggregate [array(min(id#27L), max(id#27L)) AS _gen_input_0#31]
!

[spark] branch master updated (c263c15 -> 4a1d273)

2020-03-03 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from c263c15  [SPARK-31015][SQL] Star(*) expression fails when used with 
qualified column names for v2 tables
 add 4a1d273  [SPARK-30997][SQL] Fix an analysis failure in generators with 
aggregate functions

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 14 ++
 .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala  | 15 +++
 .../org/apache/spark/sql/GeneratorFunctionSuite.scala |  5 +
 3 files changed, 34 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (c263c15 -> 4a1d273)

2020-03-03 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from c263c15  [SPARK-31015][SQL] Star(*) expression fails when used with 
qualified column names for v2 tables
 add 4a1d273  [SPARK-30997][SQL] Fix an analysis failure in generators with 
aggregate functions

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 14 ++
 .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala  | 15 +++
 .../org/apache/spark/sql/GeneratorFunctionSuite.scala |  5 +
 3 files changed, 34 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-31015][SQL] Star(*) expression fails when used with qualified column names for v2 tables

2020-03-03 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 1c23be3  [SPARK-31015][SQL] Star(*) expression fails when used with 
qualified column names for v2 tables
1c23be3 is described below

commit 1c23be3b8addaa5e15d29de788b906bcbdae953b
Author: Terry Kim 
AuthorDate: Wed Mar 4 00:55:26 2020 +0800

[SPARK-31015][SQL] Star(*) expression fails when used with qualified column 
names for v2 tables

### What changes were proposed in this pull request?

For a v2 table created with `CREATE TABLE testcat.ns1.ns2.tbl (id bigint, 
name string) USING foo`, the following works as expected
```
SELECT testcat.ns1.ns2.tbl.id FROM testcat.ns1.ns2.tbl
```
, but a query with qualified column name with star(*)
```
SELECT testcat.ns1.ns2.tbl.* FROM testcat.ns1.ns2.tbl
[info]   org.apache.spark.sql.AnalysisException: cannot resolve 
'testcat.ns1.ns2.tbl.*' given input columns 'id, name';
```
fails to resolve. And this PR proposes to fix this issue.

### Why are the changes needed?

To fix a bug as describe above.

### Does this PR introduce any user-facing change?

Yes, now `SELECT testcat.ns1.ns2.tbl.* FROM testcat.ns1.ns2.tbl` works as 
expected.

### How was this patch tested?

Added new test.

Closes #27766 from imback82/fix_star_expression.

Authored-by: Terry Kim 
Signed-off-by: Wenchen Fan 
(cherry picked from commit c263c154080e54dd07aaa584913773314c3528e5)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/analysis/unresolved.scala   | 35 --
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 27 +
 2 files changed, 40 insertions(+), 22 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 608f39c..6048d98 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -298,35 +298,26 @@ abstract class Star extends LeafExpression with 
NamedExpression {
 case class UnresolvedStar(target: Option[Seq[String]]) extends Star with 
Unevaluable {
 
   /**
-   * Returns true if the nameParts match the qualifier of the attribute
+   * Returns true if the nameParts is a subset of the last elements of 
qualifier of the attribute.
*
-   * There are two checks: i) Check if the nameParts match the qualifier fully.
-   * E.g. SELECT db.t1.* FROM db1.t1   In this case, the nameParts is 
Seq("db1", "t1") and
-   * qualifier of the attribute is Seq("db1","t1")
-   * ii) If (i) is not true, then check if nameParts is only a single element 
and it
-   * matches the table portion of the qualifier
-   *
-   * E.g. SELECT t1.* FROM db1.t1  In this case nameParts is Seq("t1") and
-   * qualifier is Seq("db1","t1")
-   * SELECT a.* FROM db1.t1 AS a
-   * In this case nameParts is Seq("a") and qualifier for
-   * attribute is Seq("a")
+   * For example, the following should all return true:
+   *   - `SELECT ns1.ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns1", 
"ns2", "t") and
+   * qualifier is Seq("ns1", "ns2", "t").
+   *   - `SELECT ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns2", "t") and
+   * qualifier is Seq("ns1", "ns2", "t").
+   *   - `SELECT t.* FROM ns1.n2.t` where nameParts is Seq("t") and
+   * qualifier is Seq("ns1", "ns2", "t").
*/
   private def matchedQualifier(
   attribute: Attribute,
   nameParts: Seq[String],
   resolver: Resolver): Boolean = {
-val qualifierList = attribute.qualifier
-
-val matched = nameParts.corresponds(qualifierList)(resolver) || {
-  // check if it matches the table portion of the qualifier
-  if (nameParts.length == 1 && qualifierList.nonEmpty) {
-resolver(nameParts.head, qualifierList.last)
-  } else {
-false
-  }
+val qualifierList = if (nameParts.length == attribute.qualifier.length) {
+  attribute.qualifier
+} else {
+  attribute.qualifier.takeRight(nameParts.length)
 }
-matched
+nameParts.corresponds(qualifierList)(resolver)
   }
 
   override def expand(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index c074b335..bccdce7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2342,6 +2342,33 @@ class DataSourceV2SQLSuite
 assert(e2.message.contains("It is not allowed to a

[spark] branch master updated (3ff2135 -> c263c15)

2020-03-03 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 3ff2135  [SPARK-30049][SQL] SQL fails to parse when comment contains 
an unmatched quote character
 add c263c15  [SPARK-31015][SQL] Star(*) expression fails when used with 
qualified column names for v2 tables

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/analysis/unresolved.scala   | 35 --
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 27 +
 2 files changed, 40 insertions(+), 22 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (3ff2135 -> c263c15)

2020-03-03 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 3ff2135  [SPARK-30049][SQL] SQL fails to parse when comment contains 
an unmatched quote character
 add c263c15  [SPARK-31015][SQL] Star(*) expression fails when used with 
qualified column names for v2 tables

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/analysis/unresolved.scala   | 35 --
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 27 +
 2 files changed, 40 insertions(+), 22 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-30049][SQL] SQL fails to parse when comment contains an unmatched quote character

2020-03-03 Thread tgraves
This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3ff2135  [SPARK-30049][SQL] SQL fails to parse when comment contains 
an unmatched quote character
3ff2135 is described below

commit 3ff213568694e265466d8480b61fd38f4fd8fdff
Author: Javier 
AuthorDate: Tue Mar 3 09:55:15 2020 -0600

[SPARK-30049][SQL] SQL fails to parse when comment contains an unmatched 
quote character

### What changes were proposed in this pull request?

A SQL statement that contains a comment with an unmatched quote character 
can lead to a parse error:
- Added a insideComment flag in the splitter method to avoid checking 
single and double quotes within a comment:
```
spark-sql> SELECT 1 -- someone's comment here
 > ;
Error in query:
extraneous input ';' expecting (line 2, pos 0)

== SQL ==
SELECT 1 -- someone's comment here
;
^^^
```

### Why are the changes needed?

This misbehaviour was not present on previous spark versions.

### Does this PR introduce any user-facing change?

- No

### How was this patch tested?

- New tests were added.

Closes #27321 from javierivanov/SPARK-30049B.

Lead-authored-by: Javier 
Co-authored-by: Javier Fuentes 
Signed-off-by: Thomas Graves 
---
 .../sql/hive/thriftserver/SparkSQLCLIDriver.scala  | 24 ++
 .../spark/sql/hive/thriftserver/CliSuite.scala | 22 
 2 files changed, 42 insertions(+), 4 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index b665d4a..19f7ea8 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -509,24 +509,40 @@ private[hive] class SparkSQLCLIDriver extends CliDriver 
with Logging {
   private def splitSemiColon(line: String): JList[String] = {
 var insideSingleQuote = false
 var insideDoubleQuote = false
+var insideComment = false
 var escape = false
 var beginIndex = 0
+var endIndex = line.length
 val ret = new JArrayList[String]
+
 for (index <- 0 until line.length) {
-  if (line.charAt(index) == '\'') {
+  if (line.charAt(index) == '\'' && !insideComment) {
 // take a look to see if it is escaped
 if (!escape) {
   // flip the boolean variable
   insideSingleQuote = !insideSingleQuote
 }
-  } else if (line.charAt(index) == '\"') {
+  } else if (line.charAt(index) == '\"' && !insideComment) {
 // take a look to see if it is escaped
 if (!escape) {
   // flip the boolean variable
   insideDoubleQuote = !insideDoubleQuote
 }
+  } else if (line.charAt(index) == '-') {
+val hasNext = index + 1 < line.length
+if (insideDoubleQuote || insideSingleQuote || insideComment) {
+  // Ignores '-' in any case of quotes or comment.
+  // Avoids to start a comment(--) within a quoted segment or already 
in a comment.
+  // Sample query: select "quoted value --"
+  //^^ avoids starting a comment 
if it's inside quotes.
+} else if (hasNext && line.charAt(index + 1) == '-') {
+  // ignore quotes and ;
+  insideComment = true
+  // ignore eol
+  endIndex = index
+}
   } else if (line.charAt(index) == ';') {
-if (insideSingleQuote || insideDoubleQuote) {
+if (insideSingleQuote || insideDoubleQuote || insideComment) {
   // do not split
 } else {
   // split, do not include ; itself
@@ -543,7 +559,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver 
with Logging {
 escape = true
   }
 }
-ret.add(line.substring(beginIndex))
+ret.add(line.substring(beginIndex, endIndex))
 ret
   }
 }
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 6609701..43aafc3 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -400,4 +400,26 @@ class CliSuite extends SparkFunSuite with 
BeforeAndAfterAll with Logging {
 -> "1.00"
 )
   }
+
+  test("SPARK-30049 Should not complain for quotes in commented lines") {
+run

[spark] branch branch-3.0 updated: [SPARK-30049][SQL] SQL fails to parse when comment contains an unmatched quote character

2020-03-03 Thread tgraves
This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 4be2a79  [SPARK-30049][SQL] SQL fails to parse when comment contains 
an unmatched quote character
4be2a79 is described below

commit 4be2a79c7a9a1b1e3b0c3704e94da19c2b87ba47
Author: Javier 
AuthorDate: Tue Mar 3 09:55:15 2020 -0600

[SPARK-30049][SQL] SQL fails to parse when comment contains an unmatched 
quote character

### What changes were proposed in this pull request?

A SQL statement that contains a comment with an unmatched quote character 
can lead to a parse error:
- Added a insideComment flag in the splitter method to avoid checking 
single and double quotes within a comment:
```
spark-sql> SELECT 1 -- someone's comment here
 > ;
Error in query:
extraneous input ';' expecting (line 2, pos 0)

== SQL ==
SELECT 1 -- someone's comment here
;
^^^
```

### Why are the changes needed?

This misbehaviour was not present on previous spark versions.

### Does this PR introduce any user-facing change?

- No

### How was this patch tested?

- New tests were added.

Closes #27321 from javierivanov/SPARK-30049B.

Lead-authored-by: Javier 
Co-authored-by: Javier Fuentes 
Signed-off-by: Thomas Graves 
(cherry picked from commit 3ff213568694e265466d8480b61fd38f4fd8fdff)
Signed-off-by: Thomas Graves 
---
 .../sql/hive/thriftserver/SparkSQLCLIDriver.scala  | 24 ++
 .../spark/sql/hive/thriftserver/CliSuite.scala | 22 
 2 files changed, 42 insertions(+), 4 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index b665d4a..19f7ea8 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -509,24 +509,40 @@ private[hive] class SparkSQLCLIDriver extends CliDriver 
with Logging {
   private def splitSemiColon(line: String): JList[String] = {
 var insideSingleQuote = false
 var insideDoubleQuote = false
+var insideComment = false
 var escape = false
 var beginIndex = 0
+var endIndex = line.length
 val ret = new JArrayList[String]
+
 for (index <- 0 until line.length) {
-  if (line.charAt(index) == '\'') {
+  if (line.charAt(index) == '\'' && !insideComment) {
 // take a look to see if it is escaped
 if (!escape) {
   // flip the boolean variable
   insideSingleQuote = !insideSingleQuote
 }
-  } else if (line.charAt(index) == '\"') {
+  } else if (line.charAt(index) == '\"' && !insideComment) {
 // take a look to see if it is escaped
 if (!escape) {
   // flip the boolean variable
   insideDoubleQuote = !insideDoubleQuote
 }
+  } else if (line.charAt(index) == '-') {
+val hasNext = index + 1 < line.length
+if (insideDoubleQuote || insideSingleQuote || insideComment) {
+  // Ignores '-' in any case of quotes or comment.
+  // Avoids to start a comment(--) within a quoted segment or already 
in a comment.
+  // Sample query: select "quoted value --"
+  //^^ avoids starting a comment 
if it's inside quotes.
+} else if (hasNext && line.charAt(index + 1) == '-') {
+  // ignore quotes and ;
+  insideComment = true
+  // ignore eol
+  endIndex = index
+}
   } else if (line.charAt(index) == ';') {
-if (insideSingleQuote || insideDoubleQuote) {
+if (insideSingleQuote || insideDoubleQuote || insideComment) {
   // do not split
 } else {
   // split, do not include ; itself
@@ -543,7 +559,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver 
with Logging {
 escape = true
   }
 }
-ret.add(line.substring(beginIndex))
+ret.add(line.substring(beginIndex, endIndex))
 ret
   }
 }
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 6609701..43aafc3 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -400,4 +400,26 @@ class CliSuite extends SparkFunSuite with 
BeforeAndAfterAll with Logging {
 -> "1.00

[spark] branch branch-3.0 updated: [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks

2020-03-03 Thread tgraves
This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 0aace99  [SPARK-30388][CORE] Mark running map stages of finished job 
as finished, and cancel running tasks
0aace99 is described below

commit 0aace99d1162348269848665725c7db2541807cc
Author: xuesenliang 
AuthorDate: Tue Mar 3 09:29:43 2020 -0600

[SPARK-30388][CORE] Mark running map stages of finished job as finished, 
and cancel running tasks

### What changes were proposed in this pull request?

When a job finished, its running (re-submitted) map stages should be marked 
as finished if not used by other jobs. The running tasks of these stages are 
cancelled.

And the ListenerBus should be notified too, otherwise, these map stage 
items will stay on the "Active Stages" page of web UI and never gone.

For example:

Suppose job 0 has two stages: map stage 0 and result stage 1. Map stage 0 
has two partitions, and its result stage 1 has two partitions too.

**Steps to reproduce the bug:**
1. map stage 0:start task 0(```TID 0```) and task 1 (```TID 1```), then 
both finished successfully.
2. result stage 1:  start task 0(```TID 2```) and task 1 (```TID 3```)
3. result stage 1:  task 0(```TID 2```) finished successfully
4. result stage 1:  speculative task 1.1(```TID 4```) launched, but then 
failed due to FetchFailedException.
5. driver re-submits map stage 0 and result stage 1.
6. map stage 0 (retry 1): task0(```TID 5```) launched
7. result stage 1: task 1(```TID 3```) finished successfully, so job 0 
finished.
8. map stage 0 is removed from ```runningStages``` and 
```stageIdToStage```, because it doesn't belong to any job.
```
  private def DAGScheduler#cleanupStateForJobAndIndependentStages(job: 
ActiveJob): HashSet[Stage] = {
   ...
  stageIdToStage.filterKeys(stageId => 
registeredStages.get.contains(stageId)).foreach {
case (stageId, stage) =>
...
def removeStage(stageId: Int): Unit = {
  for (stage <- stageIdToStage.get(stageId)) {
if (runningStages.contains(stage)) {
  logDebug("Removing running stage %d".format(stageId))
  runningStages -= stage
}
...
  }
  stageIdToStage -= stageId
}

jobSet -= job.jobId
if (jobSet.isEmpty) { // no other job needs this stage
  removeStage(stageId)
}
  }
  ...
  }

```
9. map stage 0 (retry 1): task0(TID 5) finished successfully, but its stage 
0 is not in ```stageIdToStage```, so the stage not ```markStageAsFinished```
```
  private[scheduler] def DAGScheduler#handleTaskCompletion(event: 
CompletionEvent): Unit = {
val task = event.task
val stageId = task.stageId
...
if (!stageIdToStage.contains(task.stageId)) {
  postTaskEnd(event)
  // Skip all the actions if the stage has been cancelled.
  return
}
...
```

 Relevant spark driver logs as follows:

```
20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at 
NativeMethodAccessorImpl.java:0) with 2 output partitions
20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at 
NativeMethodAccessorImpl.java:0)
20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: 
List(ShuffleMapStage 0)
20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 
0)

20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 
(MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no 
missing parents
20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from 
ShuffleMapStage 0 (MapPartitionsRDD[2] at main at 
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 
1))
20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 
tasks
20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 
0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 
1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes)
20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 
0) in 32491 ms on 9.179.143.4 (executor 1) (1/2)
20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 
1) in 40544 ms on 9.76.13.26 (executor 2) (2/2)
20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at 
NativeMethodAccessorImpl.java:0) finished in 40.854 s
20/01/02 11:22:26 INFO YarnClus

[spark] branch master updated: [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks

2020-03-03 Thread tgraves
This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7a4cf33  [SPARK-30388][CORE] Mark running map stages of finished job 
as finished, and cancel running tasks
7a4cf33 is described below

commit 7a4cf339d7082b576624940253e8283de9e83e19
Author: xuesenliang 
AuthorDate: Tue Mar 3 09:29:43 2020 -0600

[SPARK-30388][CORE] Mark running map stages of finished job as finished, 
and cancel running tasks

### What changes were proposed in this pull request?

When a job finished, its running (re-submitted) map stages should be marked 
as finished if not used by other jobs. The running tasks of these stages are 
cancelled.

And the ListenerBus should be notified too, otherwise, these map stage 
items will stay on the "Active Stages" page of web UI and never gone.

For example:

Suppose job 0 has two stages: map stage 0 and result stage 1. Map stage 0 
has two partitions, and its result stage 1 has two partitions too.

**Steps to reproduce the bug:**
1. map stage 0:start task 0(```TID 0```) and task 1 (```TID 1```), then 
both finished successfully.
2. result stage 1:  start task 0(```TID 2```) and task 1 (```TID 3```)
3. result stage 1:  task 0(```TID 2```) finished successfully
4. result stage 1:  speculative task 1.1(```TID 4```) launched, but then 
failed due to FetchFailedException.
5. driver re-submits map stage 0 and result stage 1.
6. map stage 0 (retry 1): task0(```TID 5```) launched
7. result stage 1: task 1(```TID 3```) finished successfully, so job 0 
finished.
8. map stage 0 is removed from ```runningStages``` and 
```stageIdToStage```, because it doesn't belong to any job.
```
  private def DAGScheduler#cleanupStateForJobAndIndependentStages(job: 
ActiveJob): HashSet[Stage] = {
   ...
  stageIdToStage.filterKeys(stageId => 
registeredStages.get.contains(stageId)).foreach {
case (stageId, stage) =>
...
def removeStage(stageId: Int): Unit = {
  for (stage <- stageIdToStage.get(stageId)) {
if (runningStages.contains(stage)) {
  logDebug("Removing running stage %d".format(stageId))
  runningStages -= stage
}
...
  }
  stageIdToStage -= stageId
}

jobSet -= job.jobId
if (jobSet.isEmpty) { // no other job needs this stage
  removeStage(stageId)
}
  }
  ...
  }

```
9. map stage 0 (retry 1): task0(TID 5) finished successfully, but its stage 
0 is not in ```stageIdToStage```, so the stage not ```markStageAsFinished```
```
  private[scheduler] def DAGScheduler#handleTaskCompletion(event: 
CompletionEvent): Unit = {
val task = event.task
val stageId = task.stageId
...
if (!stageIdToStage.contains(task.stageId)) {
  postTaskEnd(event)
  // Skip all the actions if the stage has been cancelled.
  return
}
...
```

 Relevant spark driver logs as follows:

```
20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at 
NativeMethodAccessorImpl.java:0) with 2 output partitions
20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at 
NativeMethodAccessorImpl.java:0)
20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: 
List(ShuffleMapStage 0)
20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 
0)

20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 
(MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no 
missing parents
20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from 
ShuffleMapStage 0 (MapPartitionsRDD[2] at main at 
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 
1))
20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 
tasks
20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 
0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 
1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes)
20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 
0) in 32491 ms on 9.179.143.4 (executor 1) (1/2)
20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 
1) in 40544 ms on 9.76.13.26 (executor 2) (2/2)
20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at 
NativeMethodAccessorImpl.java:0) finished in 40.854 s
20/01/02 11:22:26 INFO YarnClusterSched

[spark] branch branch-3.0 updated: [SPARK-30994][CORE] Update xerces to 2.12.0

2020-03-03 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 591bfd9  [SPARK-30994][CORE] Update xerces to 2.12.0
591bfd9 is described below

commit 591bfd9466887fd41112945343722b573dc90a5b
Author: Sean Owen 
AuthorDate: Tue Mar 3 09:27:18 2020 -0600

[SPARK-30994][CORE] Update xerces to 2.12.0

### What changes were proposed in this pull request?

Manage up the version of Xerces that Hadoop uses (and potentially user 
apps) to 2.12.0 to match https://issues.apache.org/jira/browse/HADOOP-16530

### Why are the changes needed?

Picks up bug and security fixes: 
https://www.xml.com/news/2018-05-apache-xerces-j-2120/

### Does this PR introduce any user-facing change?

Should be no behavior changes.

### How was this patch tested?

Existing tests.

Closes #27746 from srowen/SPARK-30994.

Authored-by: Sean Owen 
Signed-off-by: Sean Owen 
(cherry picked from commit 97d9a22b04cc831b7de19db735154c69f696773c)
Signed-off-by: Sean Owen 
---
 dev/deps/spark-deps-hadoop-2.7-hive-1.2 | 2 +-
 dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 2 +-
 pom.xml | 6 ++
 3 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 
b/dev/deps/spark-deps-hadoop-2.7-hive-1.2
index 113b7d7..364b825 100644
--- a/dev/deps/spark-deps-hadoop-2.7-hive-1.2
+++ b/dev/deps/spark-deps-hadoop-2.7-hive-1.2
@@ -201,7 +201,7 @@ super-csv/2.2.0//super-csv-2.2.0.jar
 threeten-extra/1.5.0//threeten-extra-1.5.0.jar
 univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
 xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
-xercesImpl/2.9.1//xercesImpl-2.9.1.jar
+xercesImpl/2.12.0//xercesImpl-2.12.0.jar
 xmlenc/0.52//xmlenc-0.52.jar
 xz/1.5//xz-1.5.jar
 zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 
b/dev/deps/spark-deps-hadoop-2.7-hive-2.3
index 7b7423a..62d1436 100644
--- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3
@@ -215,7 +215,7 @@ transaction-api/1.1//transaction-api-1.1.jar
 univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
 velocity/1.5//velocity-1.5.jar
 xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
-xercesImpl/2.9.1//xercesImpl-2.9.1.jar
+xercesImpl/2.12.0//xercesImpl-2.12.0.jar
 xmlenc/0.52//xmlenc-0.52.jar
 xz/1.5//xz-1.5.jar
 zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
diff --git a/pom.xml b/pom.xml
index 9d36faf..f98d132 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1083,6 +1083,12 @@
   
 
   
+  
+  
+xerces
+xercesImpl
+2.12.0
+  
   
 org.apache.avro
 avro


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (a4aaee0 -> 97d9a22)

2020-03-03 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from a4aaee0  [MINOR][DOCS] ForeachBatch java example fix
 add 97d9a22  [SPARK-30994][CORE] Update xerces to 2.12.0

No new revisions were added by this update.

Summary of changes:
 dev/deps/spark-deps-hadoop-2.7-hive-1.2 | 2 +-
 dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 2 +-
 pom.xml | 6 ++
 3 files changed, 8 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks

2020-03-03 Thread tgraves
This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2d22498  [SPARK-30388][CORE] Mark running map stages of finished job 
as finished, and cancel running tasks
2d22498 is described below

commit 2d22498d6a1f290f9ca54404a6e83ed4b61431d2
Author: xuesenliang 
AuthorDate: Tue Mar 3 09:27:07 2020 -0600

[SPARK-30388][CORE] Mark running map stages of finished job as finished, 
and cancel running tasks

### What changes were proposed in this pull request?

When a job finished, its running (re-submitted) map stages should be marked 
as finished if not used by other jobs. The running tasks of these stages are 
cancelled.

And the ListenerBus should be notified too, otherwise, these map stage 
items will stay on the "Active Stages" page of web UI and never gone.

For example:

Suppose job 0 has two stages: map stage 0 and result stage 1. Map stage 0 
has two partitions, and its result stage 1 has two partitions too.

**Steps to reproduce the bug:**
1. map stage 0:start task 0(```TID 0```) and task 1 (```TID 1```), then 
both finished successfully.
2. result stage 1:  start task 0(```TID 2```) and task 1 (```TID 3```)
3. result stage 1:  task 0(```TID 2```) finished successfully
4. result stage 1:  speculative task 1.1(```TID 4```) launched, but then 
failed due to FetchFailedException.
5. driver re-submits map stage 0 and result stage 1.
6. map stage 0 (retry 1): task0(```TID 5```) launched
7. result stage 1: task 1(```TID 3```) finished successfully, so job 0 
finished.
8. map stage 0 is removed from ```runningStages``` and 
```stageIdToStage```, because it doesn't belong to any job.
```
  private def DAGScheduler#cleanupStateForJobAndIndependentStages(job: 
ActiveJob): HashSet[Stage] = {
   ...
  stageIdToStage.filterKeys(stageId => 
registeredStages.get.contains(stageId)).foreach {
case (stageId, stage) =>
...
def removeStage(stageId: Int): Unit = {
  for (stage <- stageIdToStage.get(stageId)) {
if (runningStages.contains(stage)) {
  logDebug("Removing running stage %d".format(stageId))
  runningStages -= stage
}
...
  }
  stageIdToStage -= stageId
}

jobSet -= job.jobId
if (jobSet.isEmpty) { // no other job needs this stage
  removeStage(stageId)
}
  }
  ...
  }

```
9. map stage 0 (retry 1): task0(TID 5) finished successfully, but its stage 
0 is not in ```stageIdToStage```, so the stage not ```markStageAsFinished```
```
  private[scheduler] def DAGScheduler#handleTaskCompletion(event: 
CompletionEvent): Unit = {
val task = event.task
val stageId = task.stageId
...
if (!stageIdToStage.contains(task.stageId)) {
  postTaskEnd(event)
  // Skip all the actions if the stage has been cancelled.
  return
}
...
```

 Relevant spark driver logs as follows:

```
20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at 
NativeMethodAccessorImpl.java:0) with 2 output partitions
20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at 
NativeMethodAccessorImpl.java:0)
20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: 
List(ShuffleMapStage 0)
20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 
0)

20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 
(MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no 
missing parents
20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from 
ShuffleMapStage 0 (MapPartitionsRDD[2] at main at 
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 
1))
20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 
tasks
20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 
0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 
1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes)
20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 
0) in 32491 ms on 9.179.143.4 (executor 1) (1/2)
20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 
1) in 40544 ms on 9.76.13.26 (executor 2) (2/2)
20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at 
NativeMethodAccessorImpl.java:0) finished in 40.854 s
20/01/02 11:22:26 INFO YarnClusterSched

[spark] branch branch-2.4 updated: [MINOR][DOCS] ForeachBatch java example fix

2020-03-03 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 0ea91da  [MINOR][DOCS] ForeachBatch java example fix
0ea91da is described below

commit 0ea91dab2ed2808edaf42398eda0614df26358d1
Author: roland-ondeviceresearch 
AuthorDate: Tue Mar 3 09:24:33 2020 -0600

[MINOR][DOCS] ForeachBatch java example fix

### What changes were proposed in this pull request?
ForEachBatch Java example was incorrect

### Why are the changes needed?
Example did not compile

### Does this PR introduce any user-facing change?
Yes, to docs.

### How was this patch tested?
In IDE.

Closes #27740 from roland1982/foreachwriter_java_example_fix.

Authored-by: roland-ondeviceresearch 
Signed-off-by: Sean Owen 
(cherry picked from commit a4aaee01fa8e71d51f49b24889d862422e0727c7)
Signed-off-by: Sean Owen 
---
 docs/structured-streaming-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 515ba07..dce4b35 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2059,7 +2059,7 @@ streamingDF.writeStream.foreachBatch { (batchDF: 
DataFrame, batchId: Long) =>
 
 {% highlight java %}
 streamingDatasetOfString.writeStream().foreachBatch(
-  new VoidFunction2, Long> {
+  new VoidFunction2, Long>() {
 public void call(Dataset dataset, Long batchId) {
   // Transform and write batchDF
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [MINOR][DOCS] ForeachBatch java example fix

2020-03-03 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new e332198  [MINOR][DOCS] ForeachBatch java example fix
e332198 is described below

commit e332198f5aab2eec30dedfab66296382d31d3245
Author: roland-ondeviceresearch 
AuthorDate: Tue Mar 3 09:24:33 2020 -0600

[MINOR][DOCS] ForeachBatch java example fix

### What changes were proposed in this pull request?
ForEachBatch Java example was incorrect

### Why are the changes needed?
Example did not compile

### Does this PR introduce any user-facing change?
Yes, to docs.

### How was this patch tested?
In IDE.

Closes #27740 from roland1982/foreachwriter_java_example_fix.

Authored-by: roland-ondeviceresearch 
Signed-off-by: Sean Owen 
(cherry picked from commit a4aaee01fa8e71d51f49b24889d862422e0727c7)
Signed-off-by: Sean Owen 
---
 docs/structured-streaming-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 47d8994..1776d23 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2105,7 +2105,7 @@ streamingDF.writeStream.foreachBatch { (batchDF: 
DataFrame, batchId: Long) =>
 
 {% highlight java %}
 streamingDatasetOfString.writeStream().foreachBatch(
-  new VoidFunction2, Long> {
+  new VoidFunction2, Long>() {
 public void call(Dataset dataset, Long batchId) {
   // Transform and write batchDF
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [MINOR][DOCS] ForeachBatch java example fix

2020-03-03 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 0ea91da  [MINOR][DOCS] ForeachBatch java example fix
0ea91da is described below

commit 0ea91dab2ed2808edaf42398eda0614df26358d1
Author: roland-ondeviceresearch 
AuthorDate: Tue Mar 3 09:24:33 2020 -0600

[MINOR][DOCS] ForeachBatch java example fix

### What changes were proposed in this pull request?
ForEachBatch Java example was incorrect

### Why are the changes needed?
Example did not compile

### Does this PR introduce any user-facing change?
Yes, to docs.

### How was this patch tested?
In IDE.

Closes #27740 from roland1982/foreachwriter_java_example_fix.

Authored-by: roland-ondeviceresearch 
Signed-off-by: Sean Owen 
(cherry picked from commit a4aaee01fa8e71d51f49b24889d862422e0727c7)
Signed-off-by: Sean Owen 
---
 docs/structured-streaming-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 515ba07..dce4b35 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2059,7 +2059,7 @@ streamingDF.writeStream.foreachBatch { (batchDF: 
DataFrame, batchId: Long) =>
 
 {% highlight java %}
 streamingDatasetOfString.writeStream().foreachBatch(
-  new VoidFunction2, Long> {
+  new VoidFunction2, Long>() {
 public void call(Dataset dataset, Long batchId) {
   // Transform and write batchDF
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b5166aa -> a4aaee0)

2020-03-03 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b5166aa  [SPARK-31013][CORE][WEBUI] InMemoryStore: improve 
removeAllByIndexValues over natural key index
 add a4aaee0  [MINOR][DOCS] ForeachBatch java example fix

No new revisions were added by this update.

Summary of changes:
 docs/structured-streaming-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-30998][SQL][2.4] ClassCastException when a generator having nested inner generators

2020-03-03 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new f4c8c48  [SPARK-30998][SQL][2.4] ClassCastException when a generator 
having nested inner generators
f4c8c48 is described below

commit f4c8c4892197b8c5425a8013a09e9b379444e6fc
Author: Takeshi Yamamuro 
AuthorDate: Tue Mar 3 23:47:40 2020 +0900

[SPARK-30998][SQL][2.4] ClassCastException when a generator having nested 
inner generators

### What changes were proposed in this pull request?

A query below failed in branch-2.4;

```
scala> sql("select array(array(1, 2), array(3)) 
ar").select(explode(explode($"ar"))).show()
20/03/01 13:51:56 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 
0)/ 1]
java.lang.ClassCastException: scala.collection.mutable.ArrayOps$ofRef 
cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
at 
org.apache.spark.sql.catalyst.expressions.ExplodeBase.eval(generators.scala:313)
at 
org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$8(GenerateExec.scala:108)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
...
```

This pr modified the `hasNestedGenerator` code in `ExtractGenerator` for 
correctly catching nested inner generators.

This backport PR comes from https://github.com/apache/spark/pull/27750#
### Why are the changes needed?

A bug fix.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added tests.

Closes #27769 from maropu/SPARK-20998-BRANCH-2.4.

Authored-by: Takeshi Yamamuro 
Signed-off-by: Takeshi Yamamuro 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 16 +---
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala| 19 +++
 .../org/apache/spark/sql/GeneratorFunctionSuite.scala |  8 
 3 files changed, 40 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0fedf7f..61f77be 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1681,10 +1681,20 @@ class Analyzer(
 }
 
 private def hasNestedGenerator(expr: NamedExpression): Boolean = {
+  def hasInnerGenerator(g: Generator): Boolean = g match {
+// Since `GeneratorOuter` is just a wrapper of generators, we skip it 
here
+case go: GeneratorOuter =>
+  hasInnerGenerator(go.child)
+case _ =>
+  g.children.exists { _.find {
+case _: Generator => true
+case _ => false
+  }.isDefined }
+  }
   CleanupAliases.trimNonTopLevelAliases(expr) match {
-case UnresolvedAlias(_: Generator, _) => false
-case Alias(_: Generator, _) => false
-case MultiAlias(_: Generator, _) => false
+case UnresolvedAlias(g: Generator, _) => hasInnerGenerator(g)
+case Alias(g: Generator, _) => hasInnerGenerator(g)
+case MultiAlias(g: Generator, _) => hasInnerGenerator(g)
 case other => hasGenerator(other)
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 45319aa..337902f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -395,6 +395,25 @@ class AnalysisErrorSuite extends AnalysisTest {
   )
 
   errorTest(
+"SPARK-30998: unsupported nested inner generators",
+{
+  val nestedListRelation = LocalRelation(
+AttributeReference("nestedList", ArrayType(ArrayType(IntegerType)))())
+  nestedListRelation.select(Explode(Explode($"nestedList")))
+},
+"Generators are not supported when it's nested in expressions, but got: " +
+  "explode(explode(nestedList))" :: Nil
+  )
+
+  errorTest(
+"SPARK-30998: unsupported nested inner generators for aggregates",
+testRelation.select(Explode(Explode(
+  CreateArray(CreateArray(min($"a") :: max($"a") :: Nil) :: Nil,
+"Generators are not supported when it's nested in expressions, but got: " +
+  "explode(explode(array(array(min(a), max(a)" :

[spark] branch branch-2.4 updated: [SPARK-30998][SQL][2.4] ClassCastException when a generator having nested inner generators

2020-03-03 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new f4c8c48  [SPARK-30998][SQL][2.4] ClassCastException when a generator 
having nested inner generators
f4c8c48 is described below

commit f4c8c4892197b8c5425a8013a09e9b379444e6fc
Author: Takeshi Yamamuro 
AuthorDate: Tue Mar 3 23:47:40 2020 +0900

[SPARK-30998][SQL][2.4] ClassCastException when a generator having nested 
inner generators

### What changes were proposed in this pull request?

A query below failed in branch-2.4;

```
scala> sql("select array(array(1, 2), array(3)) 
ar").select(explode(explode($"ar"))).show()
20/03/01 13:51:56 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 
0)/ 1]
java.lang.ClassCastException: scala.collection.mutable.ArrayOps$ofRef 
cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
at 
org.apache.spark.sql.catalyst.expressions.ExplodeBase.eval(generators.scala:313)
at 
org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$8(GenerateExec.scala:108)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
...
```

This pr modified the `hasNestedGenerator` code in `ExtractGenerator` for 
correctly catching nested inner generators.

This backport PR comes from https://github.com/apache/spark/pull/27750#
### Why are the changes needed?

A bug fix.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added tests.

Closes #27769 from maropu/SPARK-20998-BRANCH-2.4.

Authored-by: Takeshi Yamamuro 
Signed-off-by: Takeshi Yamamuro 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 16 +---
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala| 19 +++
 .../org/apache/spark/sql/GeneratorFunctionSuite.scala |  8 
 3 files changed, 40 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0fedf7f..61f77be 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1681,10 +1681,20 @@ class Analyzer(
 }
 
 private def hasNestedGenerator(expr: NamedExpression): Boolean = {
+  def hasInnerGenerator(g: Generator): Boolean = g match {
+// Since `GeneratorOuter` is just a wrapper of generators, we skip it 
here
+case go: GeneratorOuter =>
+  hasInnerGenerator(go.child)
+case _ =>
+  g.children.exists { _.find {
+case _: Generator => true
+case _ => false
+  }.isDefined }
+  }
   CleanupAliases.trimNonTopLevelAliases(expr) match {
-case UnresolvedAlias(_: Generator, _) => false
-case Alias(_: Generator, _) => false
-case MultiAlias(_: Generator, _) => false
+case UnresolvedAlias(g: Generator, _) => hasInnerGenerator(g)
+case Alias(g: Generator, _) => hasInnerGenerator(g)
+case MultiAlias(g: Generator, _) => hasInnerGenerator(g)
 case other => hasGenerator(other)
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 45319aa..337902f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -395,6 +395,25 @@ class AnalysisErrorSuite extends AnalysisTest {
   )
 
   errorTest(
+"SPARK-30998: unsupported nested inner generators",
+{
+  val nestedListRelation = LocalRelation(
+AttributeReference("nestedList", ArrayType(ArrayType(IntegerType)))())
+  nestedListRelation.select(Explode(Explode($"nestedList")))
+},
+"Generators are not supported when it's nested in expressions, but got: " +
+  "explode(explode(nestedList))" :: Nil
+  )
+
+  errorTest(
+"SPARK-30998: unsupported nested inner generators for aggregates",
+testRelation.select(Explode(Explode(
+  CreateArray(CreateArray(min($"a") :: max($"a") :: Nil) :: Nil,
+"Generators are not supported when it's nested in expressions, but got: " +
+  "explode(explode(array(array(min(a), max(a)" :

[spark] branch master updated: [SPARK-31013][CORE][WEBUI] InMemoryStore: improve removeAllByIndexValues over natural key index

2020-03-03 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b5166aa  [SPARK-31013][CORE][WEBUI] InMemoryStore: improve 
removeAllByIndexValues over natural key index
b5166aa is described below

commit b5166aac1f2da7a2e90fc3a4932cfd411286843a
Author: Gengliang Wang 
AuthorDate: Tue Mar 3 19:34:19 2020 +0800

[SPARK-31013][CORE][WEBUI] InMemoryStore: improve removeAllByIndexValues 
over natural key index

### What changes were proposed in this pull request?

The method `removeAllByIndexValues` in KVStore is to delete all the objects 
which have certain values in the given index.
However, in the current implementation of `InMemoryStore`, when the given 
index is the natural key index, there is no special handling for it and a 
linear search over all the task data is performed.

We can improve it by deleting the natural keys directly from the internal 
hashmap.

### Why are the changes needed?

Better performance if the given index for `removeAllByIndexValues` is the 
natural key index in
`InMemoryStore`
### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Enhance the existing test.

Closes #27763 from gengliangwang/useNaturalIndex.

Authored-by: Gengliang Wang 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/util/kvstore/InMemoryStore.java   | 16 +++-
 .../apache/spark/util/kvstore/InMemoryStoreSuite.java  | 18 --
 2 files changed, 23 insertions(+), 11 deletions(-)

diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
index db08740..f1bebb4 100644
--- 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
@@ -231,11 +231,16 @@ public class InMemoryStore implements KVStore {
 }
 
 int countingRemoveAllByIndexValues(String index, Collection 
indexValues) {
-  if (hasNaturalParentIndex && naturalParentIndexName.equals(index)) {
+  int count = 0;
+  if (KVIndex.NATURAL_INDEX_NAME.equals(index)) {
+for (Object naturalKey : indexValues) {
+  count += delete(asKey(naturalKey)) ? 1 : 0;
+}
+return count;
+  } else if (hasNaturalParentIndex && 
naturalParentIndexName.equals(index)) {
 // If there is a parent index for the natural index and `index` 
happens to be it,
 // Spark can use the `parentToChildrenMap` to get the related natural 
keys, and then
 // delete them from `data`.
-int count = 0;
 for (Object indexValue : indexValues) {
   Comparable parentKey = asKey(indexValue);
   NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, 
new NaturalKeys());
@@ -271,9 +276,9 @@ public class InMemoryStore implements KVStore {
   }
 }
 
-public void delete(Object key) {
-  data.remove(asKey(key));
-  if (hasNaturalParentIndex) {
+public boolean delete(Object key) {
+  boolean entryExists = data.remove(asKey(key)) != null;
+  if (entryExists && hasNaturalParentIndex) {
 for (NaturalKeys v : parentToChildrenMap.values()) {
   if (v.remove(asKey(key))) {
 // `v` can be empty after removing the natural key and we can 
remove it from
@@ -284,6 +289,7 @@ public class InMemoryStore implements KVStore {
   }
 }
   }
+  return entryExists;
 }
 
 public int size() {
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java
 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java
index 9e34225..da52676 100644
--- 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java
@@ -158,23 +158,29 @@ public class InMemoryStoreSuite {
 
 assertEquals(9, store.count(ArrayKeyIndexType.class));
 
+// Try removing non-existing keys
+assert(!store.removeAllByIndexValues(
+  ArrayKeyIndexType.class,
+  KVIndex.NATURAL_INDEX_NAME,
+  ImmutableSet.of(new int[] {10, 10, 10}, new int[] { 3, 3, 3 })));
+assertEquals(9, store.count(ArrayKeyIndexType.class));
 
-store.removeAllByIndexValues(
+assert(store.removeAllByIndexValues(
   ArrayKeyIndexType.class,
   KVIndex.NATURAL_INDEX_NAME,
-  ImmutableSet.of(new int[] {0, 0, 0}, new int[] { 2, 2, 2 }));
+  ImmutableSet.of(new int[] {0, 0, 0}, new int[] { 2, 2, 2 })));
 assertEquals(7, store.count(ArrayKeyIndexType.class));
 
-store.removeAllByIndexValues(
+assert(store.

[spark] branch branch-3.0 updated: [SPARK-30998][SQL] ClassCastException when a generator having nested inner generators

2020-03-03 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new ded0a72  [SPARK-30998][SQL] ClassCastException when a generator having 
nested inner generators
ded0a72 is described below

commit ded0a72d81c1d34753be8a156126312506fb50b1
Author: Takeshi Yamamuro 
AuthorDate: Tue Mar 3 19:00:33 2020 +0900

[SPARK-30998][SQL] ClassCastException when a generator having nested inner 
generators

### What changes were proposed in this pull request?

A query below failed in the master;

```
scala> sql("select array(array(1, 2), array(3)) 
ar").select(explode(explode($"ar"))).show()
20/03/01 13:51:56 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 
0)/ 1]
java.lang.ClassCastException: scala.collection.mutable.ArrayOps$ofRef 
cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
at 
org.apache.spark.sql.catalyst.expressions.ExplodeBase.eval(generators.scala:313)
at 
org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$8(GenerateExec.scala:108)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
...
```

This pr modified the `hasNestedGenerator` code in `ExtractGenerator` for 
correctly catching nested inner generators.

### Why are the changes needed?

A bug fix.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added tests.

Closes #27750 from maropu/HandleNestedGenerators.

Authored-by: Takeshi Yamamuro 
Signed-off-by: Takeshi Yamamuro 
(cherry picked from commit 313e62c376acab30e546df253b28452a664d3e73)
Signed-off-by: Takeshi Yamamuro 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 16 +---
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala| 19 +++
 .../org/apache/spark/sql/GeneratorFunctionSuite.scala |  8 
 3 files changed, 40 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3d79799..486b952 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2164,10 +2164,20 @@ class Analyzer(
 }
 
 private def hasNestedGenerator(expr: NamedExpression): Boolean = {
+  def hasInnerGenerator(g: Generator): Boolean = g match {
+// Since `GeneratorOuter` is just a wrapper of generators, we skip it 
here
+case go: GeneratorOuter =>
+  hasInnerGenerator(go.child)
+case _ =>
+  g.children.exists { _.find {
+case _: Generator => true
+case _ => false
+  }.isDefined }
+  }
   CleanupAliases.trimNonTopLevelAliases(expr) match {
-case UnresolvedAlias(_: Generator, _) => false
-case Alias(_: Generator, _) => false
-case MultiAlias(_: Generator, _) => false
+case UnresolvedAlias(g: Generator, _) => hasInnerGenerator(g)
+case Alias(g: Generator, _) => hasInnerGenerator(g)
+case MultiAlias(g: Generator, _) => hasInnerGenerator(g)
 case other => hasGenerator(other)
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 8f62b0b..3db1053 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -434,6 +434,25 @@ class AnalysisErrorSuite extends AnalysisTest {
   )
 
   errorTest(
+"SPARK-30998: unsupported nested inner generators",
+{
+  val nestedListRelation = LocalRelation(
+AttributeReference("nestedList", ArrayType(ArrayType(IntegerType)))())
+  nestedListRelation.select(Explode(Explode($"nestedList")))
+},
+"Generators are not supported when it's nested in expressions, but got: " +
+  "explode(explode(nestedList))" :: Nil
+  )
+
+  errorTest(
+"SPARK-30998: unsupported nested inner generators for aggregates",
+testRelation.select(Explode(Explode(
+  CreateArray(CreateArray(min($"a") :: max($"a") :: Nil) :: Nil,
+"Generators are not supported when it's nested in expressions, but got: " +
+  "explode(explode(array(arr

[spark] branch master updated (1fac06c -> 313e62c)

2020-03-03 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 1fac06c  Revert "[SPARK-30808][SQL] Enable Java 8 time API in Thrift 
server"
 add 313e62c  [SPARK-30998][SQL] ClassCastException when a generator having 
nested inner generators

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 16 +---
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala| 19 +++
 .../org/apache/spark/sql/GeneratorFunctionSuite.scala |  8 
 3 files changed, 40 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-30993][SQL][2.4] Use its sql type for UDT when checking the type of length (fixed/var) or mutable

2020-03-03 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 7216099  [SPARK-30993][SQL][2.4] Use its sql type for UDT when 
checking the type of length (fixed/var) or mutable
7216099 is described below

commit 72160991dc524456a136bca3b0c86359f66f37c4
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Tue Mar 3 17:50:43 2020 +0800

[SPARK-30993][SQL][2.4] Use its sql type for UDT when checking the type of 
length (fixed/var) or mutable

### What changes were proposed in this pull request?

This patch fixes the bug of UnsafeRow which misses to handle the UDT 
specifically, in `isFixedLength` and `isMutable`. These methods don't check its 
SQL type for UDT, always treating UDT as variable-length, and non-mutable.

It doesn't bring any issue if UDT is used to represent complicated type, 
but when UDT is used to represent some type which is matched with fixed length 
of SQL type, it exposes the chance of correctness issues, as these informations 
sometimes decide how the value should be handled.

We got report from user mailing list which suspected as mapGroupsWithState 
looks like handling UDT incorrectly, but after some investigation it was from 
GenerateUnsafeRowJoiner in shuffle phase.


https://github.com/apache/spark/blob/0e2ca11d80c3921387d7b077cb64c3a0c06b08d7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala#L32-L43

Here updating position should not happen on fixed-length column, but due to 
this bug, the value of UDT having fixed-length as sql type would be modified, 
which actually corrupts the value.

### Why are the changes needed?

Misclassifying of the type of length for UDT can corrupt the value when the 
row is presented to the input of GenerateUnsafeRowJoiner, which brings 
correctness issue.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

New UT added.

Closes #27761 from HeartSaVioR/SPARK-30993-branch-2.4.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/expressions/UnsafeRow.java  |  8 +
 .../codegen/GenerateUnsafeRowJoinerSuite.scala | 41 -
 .../apache/spark/sql/UserDefinedTypeSuite.scala| 42 ++
 3 files changed, 90 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index ee2b67a..a2440d9 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -95,6 +95,10 @@ public final class UnsafeRow extends InternalRow implements 
Externalizable, Kryo
   }
 
   public static boolean isFixedLength(DataType dt) {
+if (dt instanceof UserDefinedType) {
+  return isFixedLength(((UserDefinedType) dt).sqlType());
+}
+
 if (dt instanceof DecimalType) {
   return ((DecimalType) dt).precision() <= Decimal.MAX_LONG_DIGITS();
 } else {
@@ -103,6 +107,10 @@ public final class UnsafeRow extends InternalRow 
implements Externalizable, Kryo
   }
 
   public static boolean isMutable(DataType dt) {
+if (dt instanceof UserDefinedType) {
+  return isMutable(((UserDefinedType) dt).sqlType());
+}
+
 return mutableFieldTypes.contains(dt) || dt instanceof DecimalType;
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
index 75c6bee..a5057d0 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
@@ -17,13 +17,15 @@
 
 package org.apache.spark.sql.catalyst.expressions.codegen
 
+import java.time.{LocalDateTime, ZoneOffset}
+
 import scala.util.Random
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.RandomDataGenerator
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
JoinedRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -99,6 +101,23 @@ class GenerateUns