[spark] branch branch-3.3 updated (dd6eca7550c -> 74043ddd0d6)

2022-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


from dd6eca7550c [SPARK-38825][SQL][TEST][FOLLOWUP] Add test for in(null) 
and notIn(null)
 add 74043ddd0d6 [SPARK-38941][TESTS][SQL][3.3] Skip RocksDB-based test 
case in StreamingJoinSuite on Apple Silicon

No new revisions were added by this update.

Summary of changes:
 .../test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala   | 1 +
 1 file changed, 1 insertion(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-38825][SQL][TEST][FOLLOWUP] Add test for in(null) and notIn(null)

2022-04-18 Thread huaxingao
This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new dd6eca7550c [SPARK-38825][SQL][TEST][FOLLOWUP] Add test for in(null) 
and notIn(null)
dd6eca7550c is described below

commit dd6eca7550c25dbcad9f12caf9fccfcad981d33f
Author: huaxingao 
AuthorDate: Mon Apr 18 21:27:57 2022 -0700

[SPARK-38825][SQL][TEST][FOLLOWUP] Add test for in(null) and notIn(null)

### What changes were proposed in this pull request?
Add test for filter `in(null)` and `notIn(null)`

### Why are the changes needed?
to make tests more complete

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

new test

Closes #36248 from huaxingao/inNotIn.

Authored-by: huaxingao 
Signed-off-by: huaxingao 
(cherry picked from commit b760e4a686939bdb837402286b8d3d8b445c5ed4)
Signed-off-by: huaxingao 
---
 .../datasources/parquet/ParquetFilterSuite.scala   | 22 +-
 1 file changed, 17 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 71ea474409c..7a09011f27c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -1905,21 +1905,33 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
   test("SPARK-38825: in and notIn filters") {
 import testImplicits._
 withTempPath { file =>
-  Seq(1, 2, 0, -1, 99, 1000, 3, 7, 
2).toDF("id").coalesce(1).write.mode("overwrite")
+  Seq(1, 2, 0, -1, 99, Integer.MAX_VALUE, 1000, 3, 7, Integer.MIN_VALUE, 2)
+.toDF("id").coalesce(1).write.mode("overwrite")
 .parquet(file.getCanonicalPath)
   var df = spark.read.parquet(file.getCanonicalPath)
-  var in = df.filter(col("id").isin(100, 3, 11, 12, 13))
-  var notIn = df.filter(!col("id").isin(100, 3, 11, 12, 13))
-  checkAnswer(in, Seq(Row(3)))
+  var in = df.filter(col("id").isin(100, 3, 11, 12, 13, Integer.MAX_VALUE, 
Integer.MIN_VALUE))
+  var notIn =
+df.filter(!col("id").isin(100, 3, 11, 12, 13, Integer.MAX_VALUE, 
Integer.MIN_VALUE))
+  checkAnswer(in, Seq(Row(3), Row(-2147483648), Row(2147483647)))
   checkAnswer(notIn, Seq(Row(1), Row(2), Row(0), Row(-1), Row(99), 
Row(1000), Row(7), Row(2)))
 
-  Seq("mary", "martin", "lucy", "alex", "mary", 
"dan").toDF("name").coalesce(1)
+  Seq("mary", "martin", "lucy", "alex", null, "mary", 
"dan").toDF("name").coalesce(1)
 .write.mode("overwrite").parquet(file.getCanonicalPath)
   df = spark.read.parquet(file.getCanonicalPath)
   in = df.filter(col("name").isin("mary", "victor", "leo", "alex"))
   notIn = df.filter(!col("name").isin("mary", "victor", "leo", "alex"))
   checkAnswer(in, Seq(Row("mary"), Row("alex"), Row("mary")))
   checkAnswer(notIn, Seq(Row("martin"), Row("lucy"), Row("dan")))
+
+  in = df.filter(col("name").isin("mary", "victor", "leo", "alex", null))
+  notIn = df.filter(!col("name").isin("mary", "victor", "leo", "alex", 
null))
+  checkAnswer(in, Seq(Row("mary"), Row("alex"), Row("mary")))
+  checkAnswer(notIn, Seq())
+
+  in = df.filter(col("name").isin(null))
+  notIn = df.filter(!col("name").isin(null))
+  checkAnswer(in, Seq())
+  checkAnswer(notIn, Seq())
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (242ee22c003 -> b760e4a6869)

2022-04-18 Thread huaxingao
This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 242ee22c003 [SPARK-38796][SQL] Update to_number and try_to_number 
functions to restrict S and MI sequence to start or end only
 add b760e4a6869 [SPARK-38825][SQL][TEST][FOLLOWUP] Add test for in(null) 
and notIn(null)

No new revisions were added by this update.

Summary of changes:
 .../datasources/parquet/ParquetFilterSuite.scala   | 22 +-
 1 file changed, 17 insertions(+), 5 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-38796][SQL] Update to_number and try_to_number functions to restrict S and MI sequence to start or end only

2022-04-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 8bd7d886e05 [SPARK-38796][SQL] Update to_number and try_to_number 
functions to restrict S and MI sequence to start or end only
8bd7d886e05 is described below

commit 8bd7d886e0570ed6d01ebbadca83c77821aee93f
Author: Daniel Tenedorio 
AuthorDate: Tue Apr 19 11:18:56 2022 +0800

[SPARK-38796][SQL] Update to_number and try_to_number functions to restrict 
S and MI sequence to start or end only

### What changes were proposed in this pull request?

Update `to_number` and `try_to_number` functions to restrict MI sequence to 
start or end only.

This satisfies the following specification:

```
to_number(expr, fmt)
fmt
  { ' [ MI | S ] [ L | $ ]
  [ 0 | 9 | G | , ] [...]
  [ . | D ]
  [ 0 | 9 ] [...]
  [ L | $ ] [ PR | MI | S ] ' }
```

### Why are the changes needed?

After reviewing the specification, this behavior makes the most sense.

### Does this PR introduce _any_ user-facing change?

Yes, a slight change in the behavior of the format string.

### How was this patch tested?

Existing and updated unit test coverage.

Closes #36154 from dtenedor/mi-anywhere.

Authored-by: Daniel Tenedorio 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 242ee22c00394c29e21bc3de0a93cb6d9746d93c)
Signed-off-by: Wenchen Fan 
---
 .../expressions/numberFormatExpressions.scala  |   4 +-
 .../spark/sql/catalyst/util/ToNumberParser.scala   | 163 -
 .../expressions/StringExpressionsSuite.scala   |  20 +--
 3 files changed, 106 insertions(+), 81 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala
index 88947c5c87a..c866bb9af9e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala
@@ -46,8 +46,8 @@ import org.apache.spark.unsafe.types.UTF8String
grouping separator relevant for the size of the number.
  '$': Specifies the location of the $ currency sign. This character 
may only be specified
once.
- 'S': Specifies the position of a '-' or '+' sign (optional, only 
allowed once).
- 'MI': Specifies that 'expr' has an optional '-' sign, but no '+' 
(only allowed once).
+ 'S' or 'MI': Specifies the position of a '-' or '+' sign (optional, 
only allowed once at
+   the beginning or end of the format string). Note that 'S' allows 
'-' but 'MI' does not.
  'PR': Only allowed at the end of the format string; specifies that 
'expr' indicates a
negative number with wrapping angled brackets.
('<1>').
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
index afba683efad..716224983e0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
@@ -49,33 +49,56 @@ object ToNumberParser {
   final val WRAPPING_ANGLE_BRACKETS_TO_NEGATIVE_NUMBER_END = 'R'
 
   // This class represents one or more characters that we expect to be present 
in the input string
-  // based on the format string.
+  // based on the format string. The toString method returns a representation 
of each token suitable
+  // for use in error messages.
   abstract class InputToken()
   // Represents some number of digits (0-9).
   abstract class Digits extends InputToken
   // Represents exactly 'num' digits (0-9).
-  case class ExactlyAsManyDigits(num: Int) extends Digits
+  case class ExactlyAsManyDigits(num: Int) extends Digits {
+override def toString: String = "digit sequence"
+  }
   // Represents at most 'num' digits (0-9).
-  case class AtMostAsManyDigits(num: Int) extends Digits
+  case class AtMostAsManyDigits(num: Int) extends Digits {
+override def toString: String = "digit sequence"
+  }
   // Represents one decimal point (.).
-  case class DecimalPoint() extends InputToken
+  case class DecimalPoint() extends InputToken {
+override def toString: String = ". or D"
+  }
   // Represents one thousands separator (,).
-  case class ThousandsSeparator() extends InputToken
+  case class ThousandsSeparator() extends InputToken {
+override def toString: String = ", or G"
+  }
   // Represents one or more groups of Digits 

[spark] branch master updated (175e429cca2 -> 242ee22c003)

2022-04-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 175e429cca2 [SPARK-37670][SQL] Support predicate pushdown and column 
pruning for de-duped CTEs
 add 242ee22c003 [SPARK-38796][SQL] Update to_number and try_to_number 
functions to restrict S and MI sequence to start or end only

No new revisions were added by this update.

Summary of changes:
 .../expressions/numberFormatExpressions.scala  |   4 +-
 .../spark/sql/catalyst/util/ToNumberParser.scala   | 163 -
 .../expressions/StringExpressionsSuite.scala   |  20 +--
 3 files changed, 106 insertions(+), 81 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-37670][SQL] Support predicate pushdown and column pruning for de-duped CTEs

2022-04-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 671539de00c [SPARK-37670][SQL] Support predicate pushdown and column 
pruning for de-duped CTEs
671539de00c is described below

commit 671539de00c1da817859de66345e122cac01a2ee
Author: Maryann Xue 
AuthorDate: Tue Apr 19 10:50:07 2022 +0800

[SPARK-37670][SQL] Support predicate pushdown and column pruning for 
de-duped CTEs

This PR adds predicate push-down and column pruning to CTEs that are not 
inlined as well as fixes a few potential correctness issues:
  1) Replace (previously not inlined) CTE refs with Repartition operations 
at the end of logical plan optimization so that WithCTE is not carried over to 
physical plan. As a result, we can simplify the logic of physical planning, as 
well as avoid a correctness issue where the logical link of a physical plan 
node can point to `WithCTE` and lead to unexpected behaviors in AQE, e.g., 
class cast exceptions in DPP.
  2) Pull (not inlined) CTE defs from subqueries up to the main query 
level, in order to avoid creating copies of the same CTE def during predicate 
push-downs and other transformations.
  3) Make CTE IDs more deterministic by starting from 0 for each query.

Improve de-duped CTEs' performance with predicate pushdown and column 
pruning; fixes de-duped CTEs' correctness issues.

No.

Added UTs.

Closes #34929 from maryannxue/cte-followup.

Lead-authored-by: Maryann Xue 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 175e429cca29c2314ee029bf009ed5222c0bffad)
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/analysis/CTESubstitution.scala|  30 ++-
 .../sql/catalyst/analysis/CheckAnalysis.scala  |   8 +-
 .../spark/sql/catalyst/optimizer/InlineCTE.scala   |  56 ++---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  64 +++---
 ...ushdownPredicatesAndPruneColumnsForCTEDef.scala | 175 
 .../optimizer/ReplaceCTERefWithRepartition.scala   |  84 
 .../spark/sql/catalyst/plans/QueryPlan.scala   |  31 +++
 .../plans/logical/basicLogicalOperators.scala  |   9 +-
 .../spark/sql/catalyst/analysis/AnalysisTest.scala |   3 +-
 .../spark/sql/execution/QueryExecution.scala   |  23 +--
 .../spark/sql/execution/SparkOptimizer.scala   |   3 +-
 .../apache/spark/sql/execution/SparkPlanner.scala  |   1 -
 .../spark/sql/execution/SparkStrategies.scala  |  31 ---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |   7 +-
 .../scalar-subquery/scalar-subquery-select.sql |  42 
 .../scalar-subquery/scalar-subquery-select.sql.out | 103 -
 .../approved-plans-v1_4/q23a.sf100/explain.txt | 166 +++
 .../approved-plans-v1_4/q23b.sf100/explain.txt | 190 -
 .../org/apache/spark/sql/CTEInlineSuite.scala  | 229 -
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |  15 ++
 20 files changed, 962 insertions(+), 308 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index c0ba3598e4b..976a5d385d8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -69,13 +69,13 @@ object CTESubstitution extends Rule[LogicalPlan] {
 if (cteDefs.isEmpty) {
   substituted
 } else if (substituted eq lastSubstituted.get) {
-  WithCTE(substituted, cteDefs.toSeq)
+  WithCTE(substituted, cteDefs.sortBy(_.id).toSeq)
 } else {
   var done = false
   substituted.resolveOperatorsWithPruning(_ => !done) {
 case p if p eq lastSubstituted.get =>
   done = true
-  WithCTE(p, cteDefs.toSeq)
+  WithCTE(p, cteDefs.sortBy(_.id).toSeq)
   }
 }
   }
@@ -203,6 +203,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
   cteDefs: mutable.ArrayBuffer[CTERelationDef]): Seq[(String, 
CTERelationDef)] = {
 val resolvedCTERelations = new mutable.ArrayBuffer[(String, 
CTERelationDef)](relations.size)
 for ((name, relation) <- relations) {
+  val lastCTEDefCount = cteDefs.length
   val innerCTEResolved = if (isLegacy) {
 // In legacy mode, outer CTE relations take precedence. Here we don't 
resolve the inner
 // `With` nodes, later we will substitute `UnresolvedRelation`s with 
outer CTE relations.
@@ -211,8 +212,33 @@ object CTESubstitution extends Rule[LogicalPlan] {
   } else {
 // A CTE definition might contain an inner CTE that has a higher 
priority, so traverse and
 // substitute CTE 

[spark] branch master updated: [SPARK-37670][SQL] Support predicate pushdown and column pruning for de-duped CTEs

2022-04-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 175e429cca2 [SPARK-37670][SQL] Support predicate pushdown and column 
pruning for de-duped CTEs
175e429cca2 is described below

commit 175e429cca29c2314ee029bf009ed5222c0bffad
Author: Maryann Xue 
AuthorDate: Tue Apr 19 10:50:07 2022 +0800

[SPARK-37670][SQL] Support predicate pushdown and column pruning for 
de-duped CTEs

### What changes were proposed in this pull request?

This PR adds predicate push-down and column pruning to CTEs that are not 
inlined as well as fixes a few potential correctness issues:
  1) Replace (previously not inlined) CTE refs with Repartition operations 
at the end of logical plan optimization so that WithCTE is not carried over to 
physical plan. As a result, we can simplify the logic of physical planning, as 
well as avoid a correctness issue where the logical link of a physical plan 
node can point to `WithCTE` and lead to unexpected behaviors in AQE, e.g., 
class cast exceptions in DPP.
  2) Pull (not inlined) CTE defs from subqueries up to the main query 
level, in order to avoid creating copies of the same CTE def during predicate 
push-downs and other transformations.
  3) Make CTE IDs more deterministic by starting from 0 for each query.

### Why are the changes needed?

Improve de-duped CTEs' performance with predicate pushdown and column 
pruning; fixes de-duped CTEs' correctness issues.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added UTs.

Closes #34929 from maryannxue/cte-followup.

Lead-authored-by: Maryann Xue 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/analysis/CTESubstitution.scala|  30 ++-
 .../sql/catalyst/analysis/CheckAnalysis.scala  |   8 +-
 .../spark/sql/catalyst/optimizer/InlineCTE.scala   |  56 ++---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  64 +++---
 ...ushdownPredicatesAndPruneColumnsForCTEDef.scala | 175 
 .../optimizer/ReplaceCTERefWithRepartition.scala   |  84 
 .../spark/sql/catalyst/plans/QueryPlan.scala   |  31 +++
 .../plans/logical/basicLogicalOperators.scala  |   9 +-
 .../spark/sql/catalyst/analysis/AnalysisTest.scala |   3 +-
 .../spark/sql/execution/QueryExecution.scala   |  23 +--
 .../spark/sql/execution/SparkOptimizer.scala   |   3 +-
 .../apache/spark/sql/execution/SparkPlanner.scala  |   1 -
 .../spark/sql/execution/SparkStrategies.scala  |  31 ---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |   7 +-
 .../scalar-subquery/scalar-subquery-select.sql |  42 
 .../scalar-subquery/scalar-subquery-select.sql.out | 103 -
 .../approved-plans-v1_4/q23a.sf100/explain.txt | 166 +++
 .../approved-plans-v1_4/q23b.sf100/explain.txt | 190 -
 .../org/apache/spark/sql/CTEInlineSuite.scala  | 229 -
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |  15 ++
 20 files changed, 962 insertions(+), 308 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index c0ba3598e4b..976a5d385d8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -69,13 +69,13 @@ object CTESubstitution extends Rule[LogicalPlan] {
 if (cteDefs.isEmpty) {
   substituted
 } else if (substituted eq lastSubstituted.get) {
-  WithCTE(substituted, cteDefs.toSeq)
+  WithCTE(substituted, cteDefs.sortBy(_.id).toSeq)
 } else {
   var done = false
   substituted.resolveOperatorsWithPruning(_ => !done) {
 case p if p eq lastSubstituted.get =>
   done = true
-  WithCTE(p, cteDefs.toSeq)
+  WithCTE(p, cteDefs.sortBy(_.id).toSeq)
   }
 }
   }
@@ -203,6 +203,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
   cteDefs: mutable.ArrayBuffer[CTERelationDef]): Seq[(String, 
CTERelationDef)] = {
 val resolvedCTERelations = new mutable.ArrayBuffer[(String, 
CTERelationDef)](relations.size)
 for ((name, relation) <- relations) {
+  val lastCTEDefCount = cteDefs.length
   val innerCTEResolved = if (isLegacy) {
 // In legacy mode, outer CTE relations take precedence. Here we don't 
resolve the inner
 // `With` nodes, later we will substitute `UnresolvedRelation`s with 
outer CTE relations.
@@ -211,8 +212,33 @@ object CTESubstitution extends Rule[LogicalPlan] {
   } else {
 // A CTE definition might 

[spark] branch master updated (eadd5354c45 -> 4b381880379)

2022-04-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from eadd5354c45 [SPARK-38903][PYTHON] Implement `ignore_index` of 
`Series.sort_values` and `Series.sort_index`
 add 4b381880379 [SPARK-38940][PYTHON] Test Series' anchor frame for 
in-place updates on Series

No new revisions were added by this update.

Summary of changes:
 python/pyspark/pandas/tests/test_series.py | 27 ++-
 1 file changed, 18 insertions(+), 9 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38903][PYTHON] Implement `ignore_index` of `Series.sort_values` and `Series.sort_index`

2022-04-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new eadd5354c45 [SPARK-38903][PYTHON] Implement `ignore_index` of 
`Series.sort_values` and `Series.sort_index`
eadd5354c45 is described below

commit eadd5354c459114aa1d0fd0a6ca432a2d6249ae9
Author: Xinrong Meng 
AuthorDate: Tue Apr 19 11:06:11 2022 +0900

[SPARK-38903][PYTHON] Implement `ignore_index` of `Series.sort_values` and 
`Series.sort_index`

### What changes were proposed in this pull request?
Implement `ignore_index` of `Series.sort_values` and `Series.sort_index`

### Why are the changes needed?
To reach parity with pandas API.

### Does this PR introduce _any_ user-facing change?
Yes. `ignore_index`s of `Series.sort_values` and `Series.sort_index` are 
supported.

```py
>>> s = ps.Series([2, 1, 3], index=['b', 'c', 'a'])
>>> s.sort_values(ignore_index=True)
01
12
23
dtype: int64
>>> s.sort_index(ignore_index=True)
03
12
21
dtype: int64
```

### How was this patch tested?
Unit tests.

Closes #36186 from xinrong-databricks/series.ignore_index.

Authored-by: Xinrong Meng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/series.py| 61 +++---
 python/pyspark/pandas/tests/test_series.py | 16 +++-
 2 files changed, 63 insertions(+), 14 deletions(-)

diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index f4638fe22de..3ac2daa612a 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -2703,7 +2703,11 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
 return first_series(DataFrame(internal))
 
 def sort_values(
-self, ascending: bool = True, inplace: bool = False, na_position: str 
= "last"
+self,
+ascending: bool = True,
+inplace: bool = False,
+na_position: str = "last",
+ignore_index: bool = False,
 ) -> Optional["Series"]:
 """
 Sort by the values.
@@ -2720,6 +2724,10 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
  if True, perform operation in-place
 na_position : {'first', 'last'}, default 'last'
  `first` puts NaNs at the beginning, `last` puts NaNs at the end
+ignore_index : bool, default False
+ If True, the resulting axis will be labeled 0, 1, …, n - 1.
+
+ .. versionadded:: 3.4.0
 
 Returns
 ---
@@ -2756,6 +2764,16 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
 0 NaN
 dtype: float64
 
+Sort values descending order and ignoring index
+
+>>> s.sort_values(ascending=False, ignore_index=True)
+010.0
+1 5.0
+2 3.0
+3 1.0
+4 NaN
+dtype: float64
+
 Sort values inplace
 
 >>> s.sort_values(ascending=False, inplace=True)
@@ -2802,10 +2820,12 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
 )
 
 if inplace:
+if ignore_index:
+psdf.reset_index(drop=True, inplace=inplace)
 self._update_anchor(psdf)
 return None
 else:
-return first_series(psdf)
+return first_series(psdf.reset_index(drop=True)) if ignore_index 
else first_series(psdf)
 
 def sort_index(
 self,
@@ -2815,6 +2835,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
 inplace: bool = False,
 kind: str = None,
 na_position: str = "last",
+ignore_index: bool = False,
 ) -> Optional["Series"]:
 """
 Sort object by labels (along an axis)
@@ -2834,6 +2855,10 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
 na_position : {‘first’, ‘last’}, default ‘last’
 first puts NaNs at the beginning, last puts NaNs at the end. Not 
implemented for
 MultiIndex.
+ignore_index : bool, default False
+If True, the resulting axis will be labeled 0, 1, …, n - 1.
+
+.. versionadded:: 3.4.0
 
 Returns
 ---
@@ -2841,50 +2866,58 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
 
 Examples
 
->>> df = ps.Series([2, 1, np.nan], index=['b', 'a', np.nan])
+>>> s = ps.Series([2, 1, np.nan], index=['b', 'a', np.nan])
 
->>> df.sort_index()
+>>> s.sort_index()
 a  1.0
 b  2.0
 NaNNaN
 dtype: float64
 
->>> df.sort_index(ascending=False)
+>>> s.sort_index(ignore_index=True)
+01.0
+12.0
+2NaN
+dtype: float64
+
+>>> s.sort_index(ascending=False)
 b  2.0
 a 

[spark] branch master updated (d2c5c53a5c2 -> 68a7aa5534a)

2022-04-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from d2c5c53a5c2 [SPARK-38933][SQL][DOCS] Add examples of window functions 
into SQL docs
 add 68a7aa5534a [SPARK-38880][PYTHON] Implement `numeric_only` parameter 
of `GroupBy.max/min`

No new revisions were added by this update.

Summary of changes:
 python/pyspark/pandas/groupby.py| 85 +++--
 python/pyspark/pandas/tests/test_groupby.py | 48 
 2 files changed, 127 insertions(+), 6 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-38933][SQL][DOCS] Add examples of window functions into SQL docs

2022-04-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 142b933eebe [SPARK-38933][SQL][DOCS] Add examples of window functions 
into SQL docs
142b933eebe is described below

commit 142b933eebec4feb58ea3643cc55e9480204fe8a
Author: Jiaan Geng 
AuthorDate: Tue Apr 19 10:41:39 2022 +0900

[SPARK-38933][SQL][DOCS] Add examples of window functions into SQL docs

Currently, Spark SQL docs display the window functions without examples.


![image](https://user-images.githubusercontent.com/8486025/163788857-38313a9c-48b2-4b72-bc60-38056d91124e.png)

In fact, Mkdocs also generates the doc 
`generated-window-funcs-examples.html`
This PR just updates the `sql-ref-functions-builtin.md`


![image](https://user-images.githubusercontent.com/8486025/163789775-17255e1a-7f7e-4b79-b780-3b04ba55dde7.png)

Let SQL docs display the examples of window functions.

'No'.
Just update docs.

Manual tests.

Closes #36243 from beliefer/SPARK-38933.

Authored-by: Jiaan Geng 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit d2c5c53a5c21a72b3e00ecc48e6cac6ae73c3c23)
Signed-off-by: Hyukjin Kwon 
---
 docs/sql-ref-functions-builtin.md | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/docs/sql-ref-functions-builtin.md 
b/docs/sql-ref-functions-builtin.md
index cabb83e09fd..08e620b0f4f 100644
--- a/docs/sql-ref-functions-builtin.md
+++ b/docs/sql-ref-functions-builtin.md
@@ -31,6 +31,8 @@ license: |
 {% if static_file.name == 'generated-window-funcs-table.html' %}
 ### Window Functions
 {% include_relative generated-window-funcs-table.html %}
+ Examples
+{% include_relative generated-window-funcs-examples.html %}
 {% break %}
 {% endif %}
 {% endfor %}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (2a1f9767213 -> d2c5c53a5c2)

2022-04-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 2a1f9767213 [SPARK-38856][SHUFFLE] Fix a rejectedExecutionException 
error when push-based shuffle is enabled
 add d2c5c53a5c2 [SPARK-38933][SQL][DOCS] Add examples of window functions 
into SQL docs

No new revisions were added by this update.

Summary of changes:
 docs/sql-ref-functions-builtin.md | 2 ++
 1 file changed, 2 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (a826469d0be -> 2a1f9767213)

2022-04-18 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from a826469d0be [SPARK-38851][CORE][TESTS] Refactor `HistoryServerSuite` 
to add UTs for RocksDB
 add 2a1f9767213 [SPARK-38856][SHUFFLE] Fix a rejectedExecutionException 
error when push-based shuffle is enabled

No new revisions were added by this update.

Summary of changes:
 .../scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala| 10 --
 1 file changed, 4 insertions(+), 6 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38851][CORE][TESTS] Refactor `HistoryServerSuite` to add UTs for RocksDB

2022-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a826469d0be [SPARK-38851][CORE][TESTS] Refactor `HistoryServerSuite` 
to add UTs for RocksDB
a826469d0be is described below

commit a826469d0be60dc344bc38445f621a4ec20861c9
Author: yangjie01 
AuthorDate: Mon Apr 18 12:19:20 2022 -0700

[SPARK-38851][CORE][TESTS] Refactor `HistoryServerSuite` to add UTs for 
RocksDB

### What changes were proposed in this pull request?
This pr expand `HistoryServerSuite` to add UTs for RocksDB scenarios.

### Why are the changes needed?
Add more UTs for RocksDB on Apple Silicon on MacOS

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass GA
- Manual test on Apple Silicon environment:

```
build/sbt "core/testOnly *RocksDBBackendHistoryServerSuite*"
```

```
[info] Run completed in 31 seconds, 100 milliseconds.
[info] Total number of tests run: 73
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 73, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

Closes #36138 from LuciferYang/SPARK-38851.

Authored-by: yangjie01 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/deploy/history/HistoryServerSuite.scala  | 37 ++
 1 file changed, 24 insertions(+), 13 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 25f962aaa65..02a32a80ddd 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -49,6 +49,7 @@ import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.status.api.v1.ApplicationInfo
 import org.apache.spark.status.api.v1.JobData
+import org.apache.spark.tags.ExtendedLevelDBTest
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{ResetSystemProperties, ShutdownHookManager, 
Utils}
 
@@ -63,8 +64,8 @@ import org.apache.spark.util.{ResetSystemProperties, 
ShutdownHookManager, Utils}
  * expectations.  However, in general this should be done with extreme 
caution, as the metrics
  * are considered part of Spark's public api.
  */
-class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with 
Matchers with MockitoSugar
-  with JsonTestUtils with Eventually with WebBrowser with LocalSparkContext
+abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter 
with Matchers
+  with MockitoSugar with JsonTestUtils with Eventually with WebBrowser with 
LocalSparkContext
   with ResetSystemProperties {
 
   private val logDir = getTestResourcePath("spark-events")
@@ -75,6 +76,10 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
   private var server: HistoryServer = null
   private var port: Int = -1
 
+  protected def diskBackend: HybridStoreDiskBackend.Value
+
+  def getExpRoot: File = expRoot
+
   def init(extraConf: (String, String)*): Unit = {
 Utils.deleteRecursively(storeDir)
 assert(storeDir.mkdir())
@@ -85,11 +90,8 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
   .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
   .set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true)
   .set(EXECUTOR_PROCESS_TREE_METRICS_ENABLED, true)
+  .set(HYBRID_STORE_DISK_BACKEND, diskBackend.toString)
 conf.setAll(extraConf)
-// Since LevelDB doesn't support Apple Silicon yet, fallback to in-memory 
provider
-if (Utils.isMacOnAppleSilicon) {
-  conf.remove(LOCAL_STORE_DIR)
-}
 provider = new FsHistoryProvider(conf)
 provider.checkForLogs()
 val securityManager = HistoryServer.createSecurityManager(conf)
@@ -393,10 +395,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
   .set(EVENT_LOG_ENABLED, true)
   .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
   .remove(IS_TESTING)
-// Since LevelDB doesn't support Apple Silicon yet, fallback to in-memory 
provider
-if (Utils.isMacOnAppleSilicon) {
-  myConf.remove(LOCAL_STORE_DIR)
-}
+  .set(HYBRID_STORE_DISK_BACKEND, diskBackend.toString)
 val provider = new FsHistoryProvider(myConf)
 val securityManager = HistoryServer.createSecurityManager(myConf)
 
@@ -715,9 +714,10 @@ object HistoryServerSuite {
 // generate the "expected" results for the characterization tests.  Just 
blindly assume the
 // current behavior is correct, and write out the returned json to the 
test/resource files
 
-val suite = new HistoryServerSuite

[spark] branch master updated: [SPARK-38910][YARN] Clean spark staging before `unregister`

2022-04-18 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 87c744b6050 [SPARK-38910][YARN] Clean spark staging before `unregister`
87c744b6050 is described below

commit 87c744b60507f82e1722f1488f1741cb2bb8e8e5
Author: Angerszh 
AuthorDate: Mon Apr 18 11:18:48 2022 -0500

[SPARK-38910][YARN] Clean spark staging before `unregister`

### What changes were proposed in this pull request?
`ApplicationMaster`'s shutdown
```
  ShutdownHookManager.addShutdownHook(priority) { () =>
try {
  val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
  val isLastAttempt = appAttemptId.getAttemptId() >= maxAppAttempts

  if (!finished) {
// The default state of ApplicationMaster is failed if it is 
invoked by shut down hook.
// This behavior is different compared to 1.x version.
// If user application is exited ahead of time by calling 
System.exit(N), here mark
// this application as failed with EXIT_EARLY. For a good 
shutdown, user shouldn't call
// System.exit(0) to terminate the application.
finish(finalStatus,
  ApplicationMaster.EXIT_EARLY,
  "Shutdown hook called before final status was reported.")
  }

  if (!unregistered) {
// we only want to unregister if we don't want the RM to retry
if (finalStatus == FinalApplicationStatus.SUCCEEDED || 
isLastAttempt) {
  unregister(finalStatus, finalMsg)
  cleanupStagingDir(new 
Path(System.getenv("SPARK_YARN_STAGING_DIR")))
}
  }
} catch {
  case e: Throwable =>
logWarning("Ignoring Exception while stopping ApplicationMaster 
from shutdown hook", e)
}
  }
```

`unregister` may throw exception, we should clean staging dir first.

### Why are the changes needed?
Clean staging dir

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Closes #36207 from AngersZh/SPARK-38910.

Authored-by: Angerszh 
Signed-off-by: Sean Owen 
---
 .../src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 99ad181a542..b15623ceff5 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -261,8 +261,8 @@ private[spark] class ApplicationMaster(
   if (!unregistered) {
 // we only want to unregister if we don't want the RM to retry
 if (finalStatus == FinalApplicationStatus.SUCCEEDED || 
isLastAttempt) {
-  unregister(finalStatus, finalMsg)
   cleanupStagingDir(new 
Path(System.getenv("SPARK_YARN_STAGING_DIR")))
+  unregister(finalStatus, finalMsg)
 }
   }
 } catch {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (dff52d649d1 -> 6d3c1c10c4a)

2022-04-18 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from dff52d649d1 [SPARK-37015][PYTHON] Inline type hints for 
python/pyspark/streaming/dstream.py
 add 6d3c1c10c4a [SPARK-38885][BUILD] Upgrade Netty to 4.1.76

No new revisions were added by this update.

Summary of changes:
 dev/deps/spark-deps-hadoop-2-hive-2.3 | 30 +++---
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 30 +++---
 pom.xml   |  4 ++--
 3 files changed, 32 insertions(+), 32 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-37015][PYTHON] Inline type hints for python/pyspark/streaming/dstream.py

2022-04-18 Thread zero323
This is an automated email from the ASF dual-hosted git repository.

zero323 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 5544cce1588 [SPARK-37015][PYTHON] Inline type hints for 
python/pyspark/streaming/dstream.py
5544cce1588 is described below

commit 5544cce15885b1f12ae5826cd3bd2d151e1d544a
Author: dch nguyen 
AuthorDate: Mon Apr 18 17:38:32 2022 +0200

[SPARK-37015][PYTHON] Inline type hints for 
python/pyspark/streaming/dstream.py

### What changes were proposed in this pull request?
Inline type hints for python/pyspark/streaming/dstream.py

### Why are the changes needed?
We can take advantage of static type checking within the functions by 
inlining the type hints.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

Closes #34324 from dchvn/SPARK-37015.

Lead-authored-by: dch nguyen 
Co-authored-by: dch nguyen 
Signed-off-by: zero323 
(cherry picked from commit dff52d649d1e27baf3b107f75636624e0cfe780f)
Signed-off-by: zero323 
---
 python/pyspark/streaming/context.py  |  22 +--
 python/pyspark/streaming/dstream.py  | 369 +++
 python/pyspark/streaming/dstream.pyi | 211 
 3 files changed, 296 insertions(+), 306 deletions(-)

diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index 52e5efed063..0be0c7b034a 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -397,12 +397,12 @@ class StreamingContext:
 the transform function parameter will be the same as the order
 of corresponding DStreams in the list.
 """
-jdstreams = [d._jdstream for d in dstreams]  # type: 
ignore[attr-defined]
+jdstreams = [d._jdstream for d in dstreams]
 # change the final serializer to sc.serializer
 func = TransformFunction(
 self._sc,
 lambda t, *rdds: transformFunc(rdds),
-*[d._jrdd_deserializer for d in dstreams],  # type: 
ignore[attr-defined]
+*[d._jrdd_deserializer for d in dstreams],
 )
 
 assert self._jvm is not None
@@ -419,35 +419,31 @@ class StreamingContext:
 raise ValueError("should have at least one DStream to union")
 if len(dstreams) == 1:
 return dstreams[0]
-if len(set(s._jrdd_deserializer for s in dstreams)) > 1:  # type: 
ignore[attr-defined]
+if len(set(s._jrdd_deserializer for s in dstreams)) > 1:
 raise ValueError("All DStreams should have same serializer")
-if len(set(s._slideDuration for s in dstreams)) > 1:  # type: 
ignore[attr-defined]
+if len(set(s._slideDuration for s in dstreams)) > 1:
 raise ValueError("All DStreams should have same slide duration")
 
 assert SparkContext._jvm is not None
 jdstream_cls = 
SparkContext._jvm.org.apache.spark.streaming.api.java.JavaDStream
 jpair_dstream_cls = 
SparkContext._jvm.org.apache.spark.streaming.api.java.JavaPairDStream
 gw = SparkContext._gateway
-if is_instance_of(gw, dstreams[0]._jdstream, jdstream_cls):  # type: 
ignore[attr-defined]
+if is_instance_of(gw, dstreams[0]._jdstream, jdstream_cls):
 cls = jdstream_cls
-elif is_instance_of(
-gw, dstreams[0]._jdstream, jpair_dstream_cls  # type: 
ignore[attr-defined]
-):
+elif is_instance_of(gw, dstreams[0]._jdstream, jpair_dstream_cls):
 cls = jpair_dstream_cls
 else:
-cls_name = (
-dstreams[0]._jdstream.getClass().getCanonicalName()  # type: 
ignore[attr-defined]
-)
+cls_name = dstreams[0]._jdstream.getClass().getCanonicalName()
 raise TypeError("Unsupported Java DStream class %s" % cls_name)
 
 assert gw is not None
 jdstreams = gw.new_array(cls, len(dstreams))
 for i in range(0, len(dstreams)):
-jdstreams[i] = dstreams[i]._jdstream  # type: ignore[attr-defined]
+jdstreams[i] = dstreams[i]._jdstream
 return DStream(
 self._jssc.union(jdstreams),
 self,
-dstreams[0]._jrdd_deserializer,  # type: ignore[attr-defined]
+dstreams[0]._jrdd_deserializer,
 )
 
 def addStreamingListener(self, streamingListener: StreamingListener) -> 
None:
diff --git a/python/pyspark/streaming/dstream.py 
b/python/pyspark/streaming/dstream.py
index f445a78bd95..934b3ae5783 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -19,19 +19,45 @@ import operator
 import time
 from itertools import chain
 from datetime import datetime
+from typing import (
+Any,
+Callable,
+Generic,
+

[spark] branch master updated: [SPARK-37015][PYTHON] Inline type hints for python/pyspark/streaming/dstream.py

2022-04-18 Thread zero323
This is an automated email from the ASF dual-hosted git repository.

zero323 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new dff52d649d1 [SPARK-37015][PYTHON] Inline type hints for 
python/pyspark/streaming/dstream.py
dff52d649d1 is described below

commit dff52d649d1e27baf3b107f75636624e0cfe780f
Author: dch nguyen 
AuthorDate: Mon Apr 18 17:38:32 2022 +0200

[SPARK-37015][PYTHON] Inline type hints for 
python/pyspark/streaming/dstream.py

### What changes were proposed in this pull request?
Inline type hints for python/pyspark/streaming/dstream.py

### Why are the changes needed?
We can take advantage of static type checking within the functions by 
inlining the type hints.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

Closes #34324 from dchvn/SPARK-37015.

Lead-authored-by: dch nguyen 
Co-authored-by: dch nguyen 
Signed-off-by: zero323 
---
 python/pyspark/streaming/context.py  |  22 +--
 python/pyspark/streaming/dstream.py  | 369 +++
 python/pyspark/streaming/dstream.pyi | 211 
 3 files changed, 296 insertions(+), 306 deletions(-)

diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index 52e5efed063..0be0c7b034a 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -397,12 +397,12 @@ class StreamingContext:
 the transform function parameter will be the same as the order
 of corresponding DStreams in the list.
 """
-jdstreams = [d._jdstream for d in dstreams]  # type: 
ignore[attr-defined]
+jdstreams = [d._jdstream for d in dstreams]
 # change the final serializer to sc.serializer
 func = TransformFunction(
 self._sc,
 lambda t, *rdds: transformFunc(rdds),
-*[d._jrdd_deserializer for d in dstreams],  # type: 
ignore[attr-defined]
+*[d._jrdd_deserializer for d in dstreams],
 )
 
 assert self._jvm is not None
@@ -419,35 +419,31 @@ class StreamingContext:
 raise ValueError("should have at least one DStream to union")
 if len(dstreams) == 1:
 return dstreams[0]
-if len(set(s._jrdd_deserializer for s in dstreams)) > 1:  # type: 
ignore[attr-defined]
+if len(set(s._jrdd_deserializer for s in dstreams)) > 1:
 raise ValueError("All DStreams should have same serializer")
-if len(set(s._slideDuration for s in dstreams)) > 1:  # type: 
ignore[attr-defined]
+if len(set(s._slideDuration for s in dstreams)) > 1:
 raise ValueError("All DStreams should have same slide duration")
 
 assert SparkContext._jvm is not None
 jdstream_cls = 
SparkContext._jvm.org.apache.spark.streaming.api.java.JavaDStream
 jpair_dstream_cls = 
SparkContext._jvm.org.apache.spark.streaming.api.java.JavaPairDStream
 gw = SparkContext._gateway
-if is_instance_of(gw, dstreams[0]._jdstream, jdstream_cls):  # type: 
ignore[attr-defined]
+if is_instance_of(gw, dstreams[0]._jdstream, jdstream_cls):
 cls = jdstream_cls
-elif is_instance_of(
-gw, dstreams[0]._jdstream, jpair_dstream_cls  # type: 
ignore[attr-defined]
-):
+elif is_instance_of(gw, dstreams[0]._jdstream, jpair_dstream_cls):
 cls = jpair_dstream_cls
 else:
-cls_name = (
-dstreams[0]._jdstream.getClass().getCanonicalName()  # type: 
ignore[attr-defined]
-)
+cls_name = dstreams[0]._jdstream.getClass().getCanonicalName()
 raise TypeError("Unsupported Java DStream class %s" % cls_name)
 
 assert gw is not None
 jdstreams = gw.new_array(cls, len(dstreams))
 for i in range(0, len(dstreams)):
-jdstreams[i] = dstreams[i]._jdstream  # type: ignore[attr-defined]
+jdstreams[i] = dstreams[i]._jdstream
 return DStream(
 self._jssc.union(jdstreams),
 self,
-dstreams[0]._jrdd_deserializer,  # type: ignore[attr-defined]
+dstreams[0]._jrdd_deserializer,
 )
 
 def addStreamingListener(self, streamingListener: StreamingListener) -> 
None:
diff --git a/python/pyspark/streaming/dstream.py 
b/python/pyspark/streaming/dstream.py
index f445a78bd95..934b3ae5783 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -19,19 +19,45 @@ import operator
 import time
 from itertools import chain
 from datetime import datetime
+from typing import (
+Any,
+Callable,
+Generic,
+Hashable,
+Iterable,
+List,
+Optional,
+Tuple,
+TypeVar,
+Union,
+TYPE_CHECKING,
+

[spark] branch master updated: [SPARK-38886][SQL] Remove outer join if aggregate functions are duplicate agnostic on streamed side

2022-04-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ca044e62536 [SPARK-38886][SQL] Remove outer join if aggregate 
functions are duplicate agnostic on streamed side
ca044e62536 is described below

commit ca044e62536b4c80acbbbab538a5f61ce074a684
Author: ulysses-you 
AuthorDate: Mon Apr 18 23:22:00 2022 +0800

[SPARK-38886][SQL] Remove outer join if aggregate functions are duplicate 
agnostic on streamed side

### What changes were proposed in this pull request?

Enhance `EliminateOuterJoin` by removing outer join if match two conditions:
- all aggregate functions are duplicate agnostic
- references are coming from stream side

### Why are the changes needed?

If aggregate child is outer join, and the aggregate references are all 
coming from the streamed side and the aggregate functions are all duplicate 
agnostic, we can remve the outer join.

For example:
```sql
SELECT t1.c1, min(t1.c2) FROM t1 LEFT JOIN t2 ON t1.c1 = t2.c1 GROUP BY 
t1.c1
==>
SELECT t1.c1, min(t1.c2) FROM t1 GROUP BY t1.c1
```

### Does this PR introduce _any_ user-facing change?

Improve performance

### How was this patch tested?

add test

Closes #36177 from ulysses-you/SPARK-38886.

Authored-by: ulysses-you 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/joins.scala   | 35 +-
 .../optimizer/AggregateOptimizeSuite.scala | 42 ++
 2 files changed, 68 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 45d8c54ea19..b21594deb70 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
 import scala.annotation.tailrec
 
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
 import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -126,11 +127,17 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
  * - full outer -> left outer if only the left side has such predicates
  * - full outer -> right outer if only the right side has such predicates
  *
- * 2. Removes outer join if it only has distinct on streamed side
+ * 2. Removes outer join if aggregate is from streamed side and duplicate 
agnostic
+ *
  * {{{
  *   SELECT DISTINCT f1 FROM t1 LEFT JOIN t2 ON t1.id = t2.id  ==>  SELECT 
DISTINCT f1 FROM t1
  * }}}
  *
+ * {{{
+ *   SELECT t1.c1, max(t1.c2) FROM t1 LEFT JOIN t2 ON t1.c1 = t2.c1 GROUP BY 
t1.c1  ==>
+ *   SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1
+ * }}}
+ *
  * This rule should be executed before pushing down the Filter
  */
 object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
@@ -166,23 +173,33 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  private def allDuplicateAgnostic(
+  aggregateExpressions: Seq[NamedExpression]): Boolean = {
+!aggregateExpressions.exists(_.exists {
+  case agg: AggregateFunction => 
!EliminateDistinct.isDuplicateAgnostic(agg)
+  case _ => false
+})
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
 _.containsPattern(OUTER_JOIN), ruleId) {
 case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | 
FullOuter, _, _)) =>
   val newJoinType = buildNewJoinType(f, j)
   if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType 
= newJoinType))
 
-case a @ Aggregate(_, _, Join(left, _, LeftOuter, _, _))
-if a.groupOnly && a.references.subsetOf(left.outputSet) =>
+case a @ Aggregate(_, aggExprs, Join(left, _, LeftOuter, _, _))
+if a.references.subsetOf(left.outputSet) && 
allDuplicateAgnostic(aggExprs) =>
   a.copy(child = left)
-case a @ Aggregate(_, _, Join(_, right, RightOuter, _, _))
-if a.groupOnly && a.references.subsetOf(right.outputSet) =>
+case a @ Aggregate(_, aggExprs, Join(_, right, RightOuter, _, _))
+if a.references.subsetOf(right.outputSet) && 
allDuplicateAgnostic(aggExprs) =>
   a.copy(child = right)
-case a @ Aggregate(_, _, p @ Project(_, Join(left, _, LeftOuter, _, _)))
-if a.groupOnly && p.references.subsetOf(left.outputSet) =>
+case a @ Aggregate(_, aggExprs, p @ Project(projectList, Join(left, _, 
LeftOuter, _, 

[spark] branch branch-3.1 updated: [SPARK-37643][SQL] when charVarcharAsString is true, for char datatype predicate query should skip rpadding rule

2022-04-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new c7733d35c35 [SPARK-37643][SQL] when charVarcharAsString is true, for 
char datatype predicate query should skip rpadding rule
c7733d35c35 is described below

commit c7733d35c35748f0953ab371f515619c2db420ef
Author: fhygh <283452...@qq.com>
AuthorDate: Mon Apr 18 23:11:32 2022 +0800

[SPARK-37643][SQL] when charVarcharAsString is true, for char datatype 
predicate query should skip rpadding rule

after add ApplyCharTypePadding rule, when predicate query column data type 
is char, if column value length is less then defined,  will be right-padding, 
then query will get incorrect result

fix query incorrect issue when predicate column data type is char, so in 
this case when charVarcharAsString is true, we should skip the rpadding rule.

before this fix, if we query with char data type for predicate, then we 
should be careful to set charVarcharAsString to true.

add new UT.

Closes #36187 from fhygh/charpredicatequery.

Authored-by: fhygh <283452...@qq.com>
Signed-off-by: Wenchen Fan 
(cherry picked from commit c1ea8b446d00dd0123a0fad93a3e143933419a76)
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala   |  3 +++
 .../scala/org/apache/spark/sql/CharVarcharTestSuite.scala   | 13 +
 2 files changed, 16 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index aacc801a824..835fb1f3a18 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3986,6 +3986,9 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.charVarcharAsString) {
+  return plan
+}
 plan.resolveOperatorsUp {
   case operator => operator.transformExpressionsUp {
 case e if !e.childrenResolved => e
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 70e69843f74..430ac57651e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -99,6 +99,19 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
 }
   }
 
+  test("char type values should not be padded when charVarcharAsString is 
true") {
+withSQLConf(SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key -> "true") {
+  withTable("t") {
+sql(s"CREATE TABLE t(a STRING, b CHAR(5), c CHAR(5)) USING $format 
partitioned by (c)")
+sql("INSERT INTO t VALUES ('abc', 'abc', 'abc')")
+checkAnswer(sql("SELECT b FROM t WHERE b='abc'"), Row("abc"))
+checkAnswer(sql("SELECT b FROM t WHERE b in ('abc')"), Row("abc"))
+checkAnswer(sql("SELECT c FROM t WHERE c='abc'"), Row("abc"))
+checkAnswer(sql("SELECT c FROM t WHERE c in ('abc')"), Row("abc"))
+  }
+}
+  }
+
   test("varchar type values length check and trim: partitioned columns") {
 (0 to 5).foreach { n =>
   // SPARK-34192: we need to create a a new table for each round of test 
because of


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-37643][SQL] when charVarcharAsString is true, for char datatype predicate query should skip rpadding rule

2022-04-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 35ec300eaea [SPARK-37643][SQL] when charVarcharAsString is true, for 
char datatype predicate query should skip rpadding rule
35ec300eaea is described below

commit 35ec300eaeabf7f6e0827f8dc9e7923969e68f00
Author: fhygh <283452...@qq.com>
AuthorDate: Mon Apr 18 23:11:32 2022 +0800

[SPARK-37643][SQL] when charVarcharAsString is true, for char datatype 
predicate query should skip rpadding rule

### What changes were proposed in this pull request?
after add ApplyCharTypePadding rule, when predicate query column data type 
is char, if column value length is less then defined,  will be right-padding, 
then query will get incorrect result

### Why are the changes needed?
fix query incorrect issue when predicate column data type is char, so in 
this case when charVarcharAsString is true, we should skip the rpadding rule.

### Does this PR introduce _any_ user-facing change?
before this fix, if we query with char data type for predicate, then we 
should be careful to set charVarcharAsString to true.

### How was this patch tested?
add new UT.

Closes #36187 from fhygh/charpredicatequery.

Authored-by: fhygh <283452...@qq.com>
Signed-off-by: Wenchen Fan 
(cherry picked from commit c1ea8b446d00dd0123a0fad93a3e143933419a76)
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala   |  3 +++
 .../scala/org/apache/spark/sql/CharVarcharTestSuite.scala   | 13 +
 2 files changed, 16 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 21233e9801e..d7bf9f2571c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -4179,6 +4179,9 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.charVarcharAsString) {
+  return plan
+}
 plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(BINARY_COMPARISON, 
IN)) {
   case operator => operator.transformExpressionsUpWithPruning(
 _.containsAnyPattern(BINARY_COMPARISON, IN)) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 7be54d49a90..88041fc26a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -100,6 +100,19 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
 }
   }
 
+  test("char type values should not be padded when charVarcharAsString is 
true") {
+withSQLConf(SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key -> "true") {
+  withTable("t") {
+sql(s"CREATE TABLE t(a STRING, b CHAR(5), c CHAR(5)) USING $format 
partitioned by (c)")
+sql("INSERT INTO t VALUES ('abc', 'abc', 'abc')")
+checkAnswer(sql("SELECT b FROM t WHERE b='abc'"), Row("abc"))
+checkAnswer(sql("SELECT b FROM t WHERE b in ('abc')"), Row("abc"))
+checkAnswer(sql("SELECT c FROM t WHERE c='abc'"), Row("abc"))
+checkAnswer(sql("SELECT c FROM t WHERE c in ('abc')"), Row("abc"))
+  }
+}
+  }
+
   test("varchar type values length check and trim: partitioned columns") {
 (0 to 5).foreach { n =>
   // SPARK-34192: we need to create a a new table for each round of test 
because of


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-37643][SQL] when charVarcharAsString is true, for char datatype predicate query should skip rpadding rule

2022-04-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 7f317c93383 [SPARK-37643][SQL] when charVarcharAsString is true, for 
char datatype predicate query should skip rpadding rule
7f317c93383 is described below

commit 7f317c93383b51bd3ce163b9f8b481f2203760f7
Author: fhygh <283452...@qq.com>
AuthorDate: Mon Apr 18 23:11:32 2022 +0800

[SPARK-37643][SQL] when charVarcharAsString is true, for char datatype 
predicate query should skip rpadding rule

### What changes were proposed in this pull request?
after add ApplyCharTypePadding rule, when predicate query column data type 
is char, if column value length is less then defined,  will be right-padding, 
then query will get incorrect result

### Why are the changes needed?
fix query incorrect issue when predicate column data type is char, so in 
this case when charVarcharAsString is true, we should skip the rpadding rule.

### Does this PR introduce _any_ user-facing change?
before this fix, if we query with char data type for predicate, then we 
should be careful to set charVarcharAsString to true.

### How was this patch tested?
add new UT.

Closes #36187 from fhygh/charpredicatequery.

Authored-by: fhygh <283452...@qq.com>
Signed-off-by: Wenchen Fan 
(cherry picked from commit c1ea8b446d00dd0123a0fad93a3e143933419a76)
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala   |  3 +++
 .../scala/org/apache/spark/sql/CharVarcharTestSuite.scala   | 13 +
 2 files changed, 16 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 9fdc466b425..1bc8814b334 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -4185,6 +4185,9 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.charVarcharAsString) {
+  return plan
+}
 plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(BINARY_COMPARISON, 
IN)) {
   case operator => operator.transformExpressionsUpWithPruning(
 _.containsAnyPattern(BINARY_COMPARISON, IN)) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 6ade7a7c99e..978e3f8d36d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -100,6 +100,19 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
 }
   }
 
+  test("char type values should not be padded when charVarcharAsString is 
true") {
+withSQLConf(SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key -> "true") {
+  withTable("t") {
+sql(s"CREATE TABLE t(a STRING, b CHAR(5), c CHAR(5)) USING $format 
partitioned by (c)")
+sql("INSERT INTO t VALUES ('abc', 'abc', 'abc')")
+checkAnswer(sql("SELECT b FROM t WHERE b='abc'"), Row("abc"))
+checkAnswer(sql("SELECT b FROM t WHERE b in ('abc')"), Row("abc"))
+checkAnswer(sql("SELECT c FROM t WHERE c='abc'"), Row("abc"))
+checkAnswer(sql("SELECT c FROM t WHERE c in ('abc')"), Row("abc"))
+  }
+}
+  }
+
   test("varchar type values length check and trim: partitioned columns") {
 (0 to 5).foreach { n =>
   // SPARK-34192: we need to create a a new table for each round of test 
because of


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-37643][SQL] when charVarcharAsString is true, for char datatype predicate query should skip rpadding rule

2022-04-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c1ea8b446d0 [SPARK-37643][SQL] when charVarcharAsString is true, for 
char datatype predicate query should skip rpadding rule
c1ea8b446d0 is described below

commit c1ea8b446d00dd0123a0fad93a3e143933419a76
Author: fhygh <283452...@qq.com>
AuthorDate: Mon Apr 18 23:11:32 2022 +0800

[SPARK-37643][SQL] when charVarcharAsString is true, for char datatype 
predicate query should skip rpadding rule

### What changes were proposed in this pull request?
after add ApplyCharTypePadding rule, when predicate query column data type 
is char, if column value length is less then defined,  will be right-padding, 
then query will get incorrect result

### Why are the changes needed?
fix query incorrect issue when predicate column data type is char, so in 
this case when charVarcharAsString is true, we should skip the rpadding rule.

### Does this PR introduce _any_ user-facing change?
before this fix, if we query with char data type for predicate, then we 
should be careful to set charVarcharAsString to true.

### How was this patch tested?
add new UT.

Closes #36187 from fhygh/charpredicatequery.

Authored-by: fhygh <283452...@qq.com>
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala   |  3 +++
 .../scala/org/apache/spark/sql/CharVarcharTestSuite.scala   | 13 +
 2 files changed, 16 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index bf0142bb059..d00818ba1ea 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -4202,6 +4202,9 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.charVarcharAsString) {
+  return plan
+}
 plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(BINARY_COMPARISON, 
IN)) {
   case operator => operator.transformExpressionsUpWithPruning(
 _.containsAnyPattern(BINARY_COMPARISON, IN)) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 6ade7a7c99e..978e3f8d36d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -100,6 +100,19 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
 }
   }
 
+  test("char type values should not be padded when charVarcharAsString is 
true") {
+withSQLConf(SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key -> "true") {
+  withTable("t") {
+sql(s"CREATE TABLE t(a STRING, b CHAR(5), c CHAR(5)) USING $format 
partitioned by (c)")
+sql("INSERT INTO t VALUES ('abc', 'abc', 'abc')")
+checkAnswer(sql("SELECT b FROM t WHERE b='abc'"), Row("abc"))
+checkAnswer(sql("SELECT b FROM t WHERE b in ('abc')"), Row("abc"))
+checkAnswer(sql("SELECT c FROM t WHERE c='abc'"), Row("abc"))
+checkAnswer(sql("SELECT c FROM t WHERE c in ('abc')"), Row("abc"))
+  }
+}
+  }
+
   test("varchar type values length check and trim: partitioned columns") {
 (0 to 5).foreach { n =>
   // SPARK-34192: we need to create a a new table for each round of test 
because of


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [MINOR] Improve to update some mutable hash maps

2022-04-18 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2a667fbff7e [MINOR] Improve to update some mutable hash maps
2a667fbff7e is described below

commit 2a667fbff7e7fd21bf69668ef78b175f60a24dba
Author: weixiuli 
AuthorDate: Mon Apr 18 09:22:24 2022 -0500

[MINOR] Improve to update some mutable hash maps

### What changes were proposed in this pull request?
 Improve to update some mutable hash maps

### Why are the changes needed?

 Reduce  some mutable hash maps calls and cleanup code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unittests.

Closes #36179 from weixiuli/update-numBlocksInFlightPerAddress.

Authored-by: weixiuli 
Signed-off-by: Sean Owen 
---
 .../main/scala/org/apache/spark/deploy/worker/Worker.scala|  4 ++--
 .../scala/org/apache/spark/resource/ResourceAllocator.scala   |  4 ++--
 .../scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala   |  2 +-
 .../apache/spark/storage/ShuffleBlockFetcherIterator.scala| 11 +--
 .../scala/org/apache/spark/ml/classification/NaiveBayes.scala |  2 +-
 .../src/main/scala/org/apache/spark/ml/stat/Summarizer.scala  |  2 +-
 6 files changed, 12 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 2c7021bdcb9..b26a0f82d69 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -264,13 +264,13 @@ private[deploy] class Worker(
 
   private def addResourcesUsed(deltaInfo: Map[String, ResourceInformation]): 
Unit = {
 deltaInfo.foreach { case (rName, rInfo) =>
-  resourcesUsed(rName) = resourcesUsed(rName) + rInfo
+  resourcesUsed(rName) += rInfo
 }
   }
 
   private def removeResourcesUsed(deltaInfo: Map[String, 
ResourceInformation]): Unit = {
 deltaInfo.foreach { case (rName, rInfo) =>
-  resourcesUsed(rName) = resourcesUsed(rName) - rInfo
+  resourcesUsed(rName) -= rInfo
 }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
index 7605e8c44b9..10cf0402d5f 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
@@ -82,7 +82,7 @@ private[spark] trait ResourceAllocator {
   }
   val isAvailable = addressAvailabilityMap(address)
   if (isAvailable > 0) {
-addressAvailabilityMap(address) = addressAvailabilityMap(address) - 1
+addressAvailabilityMap(address) -= 1
   } else {
 throw new SparkException("Try to acquire an address that is not 
available. " +
   s"$resourceName address $address is not available.")
@@ -103,7 +103,7 @@ private[spark] trait ResourceAllocator {
   }
   val isAvailable = addressAvailabilityMap(address)
   if (isAvailable < slotsPerAddress) {
-addressAvailabilityMap(address) = addressAvailabilityMap(address) + 1
+addressAvailabilityMap(address) += 1
   } else {
 throw new SparkException(s"Try to release an address that is not 
assigned. $resourceName " +
   s"address $address is not assigned.")
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index 230ec7efdb1..15a9ddd40e6 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -317,7 +317,7 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
   pushResult: PushResult): Boolean = synchronized {
 remainingBlocks -= pushResult.blockId
 bytesInFlight -= bytesPushed
-numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
+numBlocksInFlightPerAddress(address) -= 1
 if (remainingBlocks.isEmpty) {
   reqsInFlight -= 1
 }
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index e2fc5389091..c91aaa8ddb7 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -763,7 +763,7 @@ final class ShuffleBlockFetcherIterator(
   shuffleMetrics.incLocalBlocksFetched(1)
   shuffleMetrics.incLocalBytesRead(buf.size)
 } else {
-  

[spark] branch master updated: [SPARK-38746][SQL][TESTS] Move the tests for `PARSE_EMPTY_STATEMENT` to QueryParsingErrorsSuite

2022-04-18 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 857acca1d17 [SPARK-38746][SQL][TESTS] Move the tests for 
`PARSE_EMPTY_STATEMENT` to QueryParsingErrorsSuite
857acca1d17 is described below

commit 857acca1d176fd0836b350eb163b0793db241f3b
Author: panbingkun 
AuthorDate: Mon Apr 18 11:05:42 2022 +0300

[SPARK-38746][SQL][TESTS] Move the tests for `PARSE_EMPTY_STATEMENT` to 
QueryParsingErrorsSuite

### What changes were proposed in this pull request?
This pr aims to move tests for the error class `PARSE_EMPTY_STATEMENT` from 
ErrorParserSuite to QueryParsingErrorsSuite., it's a followup of SPARK-37935.

### Why are the changes needed?
To improve code maintenance.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
By running the moved tests:
```
$ build/sbt "sql/testOnly *QueryParsingErrorsSuite*"
```

Closes #36221 from panbingkun/SPARK-38746.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 .../sql/catalyst/parser/ErrorParserSuite.scala |  7 
 .../spark/sql/errors/QueryParsingErrorsSuite.scala | 41 ++
 2 files changed, 41 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ErrorParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ErrorParserSuite.scala
index 6fb77aa012f..aa9f096cfe2 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ErrorParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ErrorParserSuite.scala
@@ -99,13 +99,6 @@ class ErrorParserSuite extends AnalysisTest {
   "Syntax error at or near", "^^^")
   }
 
-  test("empty input") {
-val expectedErrMsg = 
SparkThrowableHelper.getMessage("PARSE_EMPTY_STATEMENT", Array[String]())
-intercept("", Some("PARSE_EMPTY_STATEMENT"), expectedErrMsg)
-intercept("   ", Some("PARSE_EMPTY_STATEMENT"), expectedErrMsg)
-intercept(" \n", Some("PARSE_EMPTY_STATEMENT"), expectedErrMsg)
-  }
-
   test("jargon token substitute to user-facing language") {
 // '' -> end of input
 intercept("select count(*", "PARSE_SYNTAX_ERROR",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala
index bf8aa9494eb..225d4f33b41 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala
@@ -431,4 +431,45 @@ class QueryParsingErrorsSuite extends QueryTest with 
SharedSparkSession {
   |---^^^
   |""".stripMargin)
   }
+
+  test("PARSE_EMPTY_STATEMENT: empty input") {
+validateParsingError(
+  sqlText = "",
+  errorClass = "PARSE_EMPTY_STATEMENT",
+  sqlState = "42000",
+  message =
+"""
+  |Syntax error, unexpected empty statement(line 1, pos 0)
+  |
+  |== SQL ==
+  |
+  |^^^
+  |""".stripMargin)
+
+validateParsingError(
+  sqlText = "   ",
+  errorClass = "PARSE_EMPTY_STATEMENT",
+  sqlState = "42000",
+  message =
+s"""
+   |Syntax error, unexpected empty statement(line 1, pos 3)
+   |
+   |== SQL ==
+   |${"   "}
+   |---^^^
+   |""".stripMargin)
+
+validateParsingError(
+  sqlText = " \n",
+  errorClass = "PARSE_EMPTY_STATEMENT",
+  sqlState = "42000",
+  message =
+s"""
+   |Syntax error, unexpected empty statement(line 2, pos 0)
+   |
+   |== SQL ==
+   |${" "}
+   |^^^
+   |""".stripMargin)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38689][SQL] Use error classes in the compilation errors of not allowed DESC PARTITION on views

2022-04-18 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new de18ce68a08 [SPARK-38689][SQL] Use error classes in the compilation 
errors of not allowed DESC PARTITION on views
de18ce68a08 is described below

commit de18ce68a08d49f4b2e5449c3a5d86e756c891e7
Author: Tengfei Huang 
AuthorDate: Mon Apr 18 10:00:32 2022 +0300

[SPARK-38689][SQL] Use error classes in the compilation errors of not 
allowed DESC PARTITION on views

### What changes were proposed in this pull request?
Migrate the following errors in QueryCompilationErrors:
- descPartitionNotAllowedOnTempView -> INVALID_OPERATION_ON_TEMP_VIEW
- descPartitionNotAllowedOnView -> INVALID_OPERATION_ON_VIEW
- descPartitionNotAllowedOnViewError -> Removed due to never used.

### Why are the changes needed?
Porting compilation errors of desc partition on views to new error 
framework.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new UT.

Closes #36163 from ivoson/SPARK-38689.

Lead-authored-by: Tengfei Huang 
Co-authored-by: Huang Tengfei 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   |  3 ++
 .../spark/sql/errors/QueryCompilationErrors.scala  | 14 +++---
 .../apache/spark/sql/errors/QueryErrorsBase.scala  |  5 +++
 .../resources/sql-tests/results/describe.sql.out   |  4 +-
 .../sql/errors/QueryCompilationErrorsSuite.scala   | 50 ++
 5 files changed, 68 insertions(+), 8 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index a98cd6fc211..26d75fa675e 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -51,6 +51,9 @@
   "FAILED_SET_ORIGINAL_PERMISSION_BACK" : {
 "message" : [ "Failed to set original permission %s back to the created 
path: %s. Exception: %s" ]
   },
+  "FORBIDDEN_OPERATION" : {
+"message" : [ "The operation %s is not allowed on %s: %s" ]
+  },
   "GRAPHITE_SINK_INVALID_PROTOCOL" : {
 "message" : [ "Invalid Graphite protocol: %s" ]
   },
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 07108755586..26c8cebe0d5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1931,11 +1931,17 @@ object QueryCompilationErrors extends QueryErrorsBase {
   }
 
   def descPartitionNotAllowedOnTempView(table: String): Throwable = {
-new AnalysisException(s"DESC PARTITION is not allowed on a temporary view: 
$table")
+new AnalysisException(
+  errorClass = "FORBIDDEN_OPERATION",
+  messageParameters =
+Array(toSQLStmt("DESC PARTITION"), "the temporary view", 
toSQLId(table)))
   }
 
   def descPartitionNotAllowedOnView(table: String): Throwable = {
-new AnalysisException(s"DESC PARTITION is not allowed on a view: $table")
+new AnalysisException(
+  errorClass = "FORBIDDEN_OPERATION",
+  messageParameters = Array(
+toSQLStmt("DESC PARTITION"), "the view", toSQLId(table)))
   }
 
   def showPartitionNotAllowedOnTableNotPartitionedError(tableIdentWithDB: 
String): Throwable = {
@@ -1971,10 +1977,6 @@ object QueryCompilationErrors extends QueryErrorsBase {
 )
   }
 
-  def descPartitionNotAllowedOnViewError(table: String): Throwable = {
-new AnalysisException(s"DESC PARTITION is not allowed on a view: $table")
-  }
-
   def showCreateTableAsSerdeNotAllowedOnSparkDataSourceTableError(
   table: TableIdentifier): Throwable = {
 new AnalysisException(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
index b7989d11986..4708b47cfb8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
@@ -46,6 +46,11 @@ trait QueryErrorsBase {
 litToErrorValue(Literal.create(v, t))
   }
 
+  // Quote sql statements in error messages.
+  def toSQLStmt(text: String): String = {
+s"'$text'"
+  }
+
   def toSQLId(parts: Seq[String]): String = {
 parts.map(quoteIdentifier).mkString(".")
   }
diff --git a/sql/core/src/test/resources/sql-tests/results/describe.sql.out 
b/sql/core/src/test/resources/sql-tests/results/describe.sql.out
index 04259c0db85..c0db04fa5c8 100644
--- a/sql/core/src/test/resources/sql-tests/results/describe.sql.out
+++ 

[spark] branch master updated: [SPARK-38926][SQL] Output types in error messages in SQL style

2022-04-18 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0d16159bfa8 [SPARK-38926][SQL] Output types in error messages in SQL 
style
0d16159bfa8 is described below

commit 0d16159bfa85ed346843e0952f37922a579c011e
Author: Max Gekk 
AuthorDate: Mon Apr 18 09:00:40 2022 +0300

[SPARK-38926][SQL] Output types in error messages in SQL style

### What changes were proposed in this pull request?
In the PR, I propose to upper case SQL types in error messages similar to 
the SQL standard. I added new util functions `toSQLType()` to the trait 
`QueryErrorsBase`, and applied it in `Query.*Errors` (also modified tests in 
`Query.*ErrorsSuite`). For example:

Before:
```sql
Cannot up cast b.`b` from decimal(38,18) to bigint.
```

After:
```sql
Cannot up cast b.`b` from DECIMAL(38,18) to BIGINT.
```

### Why are the changes needed?
To improve user experience with Spark SQL. The changes highlight SQL types 
in error massages and make them more visible for users.

### Does this PR introduce _any_ user-facing change?
No since error classes haven't been released yet.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "test:testOnly *QueryParsingErrorsSuite"
$ build/sbt "test:testOnly *QueryCompilationErrorsSuite"
$ build/sbt "test:testOnly *QueryExecutionErrorsSuite"
$ build/sbt "testOnly *CastSuite"
$ build/sbt "testOnly *AnsiCastSuiteWithAnsiModeOn"
$ build/sbt "testOnly *EncoderResolutionSuite"
$ build/sbt "test:testOnly *DatasetSuite"
$ build/sbt "test:testOnly *InsertSuite"
```

Closes #36233 from MaxGekk/error-class-toSQLType.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 .../spark/sql/errors/QueryCompilationErrors.scala  |  4 +-
 .../apache/spark/sql/errors/QueryErrorsBase.scala  |  4 ++
 .../spark/sql/errors/QueryExecutionErrors.scala| 18 +++---
 .../spark/sql/errors/QueryParsingErrors.scala  | 10 +++-
 .../catalyst/encoders/EncoderResolutionSuite.scala |  8 +--
 .../catalyst/expressions/AnsiCastSuiteBase.scala   |  4 +-
 .../spark/sql/catalyst/expressions/CastSuite.scala | 66 +++---
 .../sql-tests/results/postgreSQL/float4.sql.out|  6 +-
 .../sql-tests/results/postgreSQL/float8.sql.out|  2 +-
 .../sql-tests/results/postgreSQL/int8.sql.out  |  8 +--
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |  2 +-
 .../sql/errors/QueryCompilationErrorsSuite.scala   |  4 +-
 .../sql/errors/QueryExecutionErrorsSuite.scala | 10 ++--
 .../spark/sql/errors/QueryParsingErrorsSuite.scala |  4 +-
 .../org/apache/spark/sql/sources/InsertSuite.scala |  8 +--
 15 files changed, 84 insertions(+), 74 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 6b32a08b6fd..07108755586 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -162,8 +162,8 @@ object QueryCompilationErrors extends QueryErrorsBase {
   errorClass = "CANNOT_UP_CAST_DATATYPE",
   messageParameters = Array(
 fromStr,
-from.dataType.catalogString,
-to.catalogString,
+toSQLType(from.dataType),
+toSQLType(to),
 s"The type path of the target object is:\n" + 
walkedTypePath.mkString("", "\n", "\n") +
   "You can either add an explicit cast to the input data or choose a 
higher precision " +
   "type of the field in the target object"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
index 9b18b59c33d..b7989d11986 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
@@ -53,4 +53,8 @@ trait QueryErrorsBase {
   def toSQLId(parts: String): String = {
 toSQLId(parts.split("\\."))
   }
+
+  def toSQLType(t: DataType): String = {
+t.sql
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 1aef33c6cc2..79e36cb485a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -91,7 +91,7 @@ object QueryExecutionErrors extends QueryErrorsBase {
 
   def castingCauseOverflowError(t: