[jira] [Resolved] (SPARK-47176) Have a ResolveAllExpressionsUpWithPruning helper function

2024-02-26 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-47176.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45270
[https://github.com/apache/spark/pull/45270]

> Have a ResolveAllExpressionsUpWithPruning helper function
> -
>
> Key: SPARK-47176
> URL: https://issues.apache.org/jira/browse/SPARK-47176
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Rui Wang
>Assignee: Rui Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-47176][SQL] Have a ResolveAllExpressionsUpWithPruning helper function

2024-02-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7234e35130c2 [SPARK-47176][SQL] Have a 
ResolveAllExpressionsUpWithPruning helper function
7234e35130c2 is described below

commit 7234e35130c2dc590c1ea06c07973953a365e7b3
Author: Rui Wang 
AuthorDate: Tue Feb 27 14:41:56 2024 +0800

[SPARK-47176][SQL] Have a ResolveAllExpressionsUpWithPruning helper function

### What changes were proposed in this pull request?

This PR proposes to have a new helper function to help tree traversal: 
`resolveExpressionsUpWithPruning`. This helper function traverses all 
expressions of a query tree bottom up and skipping the subtree if the condition 
returns false.
### Why are the changes needed?

Without this helper function, a developer will need to combine 
`plan.resolveOperatorsUpWithPruning` and `case p: LogicalPlan => 
p.transformExpressionsUpWithPruning` to achieve the same thing.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing UT

### Was this patch authored or co-authored using generative AI tooling?

NO

Closes #45270 from amaliujia/analysis_helper.

Authored-by: Rui Wang 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/analysis/Analyzer.scala |  7 ++-
 .../catalyst/plans/logical/AnalysisHelper.scala| 23 ++
 2 files changed, 25 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1fb5d00bdf39..c5b01a312664 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -382,10 +382,8 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
* 2. otherwise, stays the same.
*/
   object ResolveBinaryArithmetic extends Rule[LogicalPlan] {
-override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
-  _.containsPattern(BINARY_ARITHMETIC), ruleId) {
-  case p: LogicalPlan => p.transformExpressionsUpWithPruning(
-_.containsPattern(BINARY_ARITHMETIC), ruleId) {
+override def apply(plan: LogicalPlan): LogicalPlan =
+  
plan.resolveExpressionsUpWithPruning(_.containsPattern(BINARY_ARITHMETIC), 
ruleId) {
 case a @ Add(l, r, mode) if a.childrenResolved => (l.dataType, 
r.dataType) match {
   case (DateType, DayTimeIntervalType(DAY, DAY)) => DateAdd(l, 
ExtractANSIIntervalDays(r))
   case (DateType, _: DayTimeIntervalType) => TimeAdd(Cast(l, 
TimestampType), r)
@@ -455,7 +453,6 @@ class Analyzer(override val catalogManager: CatalogManager) 
extends RuleExecutor
   case _ => d
 }
   }
-}
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
index 45a20cbe3aaf..2952db46020a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
@@ -250,6 +250,29 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { 
self: LogicalPlan =>
 }
   }
 
+  /**
+   * Recursively transforms all expressions of all the logical nodes in plan 
tree
+   * in a bottom-up manner. It applies first to all of its children and then 
itself (post-order),
+   * skipping nodes that have already been analyzed.
+   *
+   * @param rule   the function used to transform this nodes children.
+   * @param cond   a Lambda expression to prune tree traversals. If 
`cond.apply` returns false
+   *   on a TreeNode T, skips processing T and its subtree; 
otherwise, processes
+   *   T and its subtree recursively.
+   * @param ruleId is a unique Id for `rule` to prune unnecessary tree 
traversals. When it is
+   *   UnknownRuleId, no pruning happens. Otherwise, if `rule` 
(with id `ruleId`)
+   *   has been marked as in effective on a TreeNode T, skips 
processing T and its
+   *   subtree. Do not pass it if the rule is not purely 
functional and reads a
+   *   varying initial state for different invocations.
+   */
+  def resolveExpressionsUpWithPruning(cond: TreePatternBits => Boolean,
+  ruleId: RuleId = UnknownRuleId)(
+  rule: PartialFunction[Expression, Expression]): LogicalPlan = {
+resolveOperatorsUpWithPruning(cond,

[jira] [Resolved] (SPARK-47009) Create table with collation

2024-02-26 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-47009.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45105
[https://github.com/apache/spark/pull/45105]

> Create table with collation
> ---
>
> Key: SPARK-47009
> URL: https://issues.apache.org/jira/browse/SPARK-47009
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Stefan Kandic
>Assignee: Stefan Kandic
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Add support for creating table with columns containing non-default collated 
> data



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47009) Create table with collation

2024-02-26 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-47009:
---

Assignee: Stefan Kandic

> Create table with collation
> ---
>
> Key: SPARK-47009
> URL: https://issues.apache.org/jira/browse/SPARK-47009
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Stefan Kandic
>Assignee: Stefan Kandic
>Priority: Major
>  Labels: pull-request-available
>
> Add support for creating table with columns containing non-default collated 
> data



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-47009][SQL] Enable create table support for collation

2024-02-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 298134fd5e98 [SPARK-47009][SQL] Enable create table support for 
collation
298134fd5e98 is described below

commit 298134fd5e987a982f468a70454ae94f1b8565e3
Author: Stefan Kandic 
AuthorDate: Mon Feb 26 21:38:56 2024 +0800

[SPARK-47009][SQL] Enable create table support for collation

### What changes were proposed in this pull request?

Adding support for create table with collated columns using parquet.

We will map collated strings types to a regular parquet string type. This 
means that won't support cross-engine compatibility for now.

I will add a PR soon to fix parquet filter pushdown. At first we will 
disable it completely for collated strings but we should look into using sort 
keys instead as min/max values to support pushdown later on.

### Why are the changes needed?

In order to support basic DDL operations for collations

### Does this PR introduce _any_ user-facing change?

Yes, users are now able to create tables with collated columns

### How was this patch tested?

With UTs

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #45105 from stefankandic/SPARK-47009-createTableCollation.

Authored-by: Stefan Kandic 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/parser/SqlBaseParser.g4 |   8 +-
 .../sql/catalyst/parser/DataTypeAstBuilder.scala   |  21 +++-
 .../org/apache/spark/sql/types/DataType.scala  |   6 +-
 .../org/apache/spark/sql/types/StringType.scala|   2 +-
 .../spark/sql/catalyst/expressions/Cast.scala  |  12 +--
 .../spark/sql/catalyst/expressions/hash.scala  |   2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala |   4 +-
 .../parquet/ParquetVectorUpdaterFactory.java   |   2 +-
 .../sql/execution/aggregate/HashMapGenerator.scala |   2 +-
 .../datasources/parquet/ParquetRowConverter.scala  |   2 +-
 .../parquet/ParquetSchemaConverter.scala   |   2 +-
 .../datasources/parquet/ParquetWriteSupport.scala  |   2 +-
 .../org/apache/spark/sql/CollationSuite.scala  | 119 -
 13 files changed, 161 insertions(+), 23 deletions(-)

diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 1109e4a7bdfc..ca01de4ffdc2 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -989,7 +989,7 @@ primaryExpression
 | CASE whenClause+ (ELSE elseExpression=expression)? END   
#searchedCase
 | CASE value=expression whenClause+ (ELSE elseExpression=expression)? END  
#simpleCase
 | name=(CAST | TRY_CAST) LEFT_PAREN expression AS dataType RIGHT_PAREN 
#cast
-| primaryExpression COLLATE stringLit  
#collate
+| primaryExpression collateClause  
#collate
 | primaryExpression DOUBLE_COLON dataType  
#castByColon
 | STRUCT LEFT_PAREN (argument+=namedExpression (COMMA 
argument+=namedExpression)*)? RIGHT_PAREN #struct
 | FIRST LEFT_PAREN expression (IGNORE NULLS)? RIGHT_PAREN  
#first
@@ -1095,6 +1095,10 @@ colPosition
 : position=FIRST | position=AFTER afterCol=errorCapturingIdentifier
 ;
 
+collateClause
+: COLLATE collationName=stringLit
+;
+
 type
 : BOOLEAN
 | TINYINT | BYTE
@@ -1105,7 +1109,7 @@ type
 | DOUBLE
 | DATE
 | TIMESTAMP | TIMESTAMP_NTZ | TIMESTAMP_LTZ
-| STRING
+| STRING collateClause?
 | CHARACTER | CHAR
 | VARCHAR
 | BINARY
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala
index 3a2e704ffe9f..0d2822e13efc 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala
@@ -24,6 +24,7 @@ import org.antlr.v4.runtime.Token
 import org.antlr.v4.runtime.tree.ParseTree
 
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
+import org.apache.spark.sql.catalyst.util.CollationFactory
 import org.apache.spark.sql.catalyst.util.SparkParserUtils.{string, withOrigin}
 import org.apache.spark.sql.errors.QueryParsingErrors
 import org.apache.spark.sql.internal.SqlApiConf
@@ -58,8 +59,8 @@ class

[jira] [Assigned] (SPARK-45599) Percentile can produce a wrong answer if -0.0 and 0.0 are mixed in the dataset

2024-02-26 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-45599:
---

Assignee: Nicholas Chammas

> Percentile can produce a wrong answer if -0.0 and 0.0 are mixed in the dataset
> --
>
> Key: SPARK-45599
> URL: https://issues.apache.org/jira/browse/SPARK-45599
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.1, 1.6.3, 3.3.0, 3.2.3, 3.5.0
>Reporter: Robert Joseph Evans
>Assignee: Nicholas Chammas
>Priority: Critical
>  Labels: correctness, pull-request-available
>
> I think this actually impacts all versions that have ever supported 
> percentile and it may impact other things because the bug is in OpenHashMap.
>  
> I am really surprised that we caught this bug because everything has to hit 
> just wrong to make it happen. in python/pyspark if you run
>  
> {code:python}
> from math import *
> from pyspark.sql.types import *
> data = [(1.779652973678931e+173,), (9.247723870123388e-295,), 
> (5.891823952773268e+98,), (inf,), (1.9042708096454302e+195,), 
> (-3.085825028509117e+74,), (-1.9569489404314425e+128,), 
> (2.0738138203216883e+201,), (inf,), (2.5212410617263588e-282,), 
> (-2.646144697462316e-35,), (-3.468683249247593e-196,), (nan,), (None,), 
> (nan,), (1.822129180806602e-245,), (5.211702553315461e-259,), (-1.0,), 
> (-5.682293414619055e+46,), (-4.585039307326895e+166,), 
> (-5.936844510098297e-82,), (-5234708055733.116,), (4920675036.053339,), 
> (None,), (4.4501477170144023e-308,), (2.176024662699802e-210,), 
> (-5.046677974902737e+132,), (-5.490780063080251e-09,), 
> (1.703824427218836e-55,), (-1.1961155424160076e+102,), 
> (1.4403274475565667e+41,), (None,), (5.4470705929955455e-86,), 
> (5.120795466142678e-215,), (-9.01991342808203e+282,), 
> (4.051866849943636e-254,), (-3588518231990.927,), (-1.8891559842111865e+63,), 
> (3.4543959813437507e-304,), (-7.590734560275502e-63,), 
> (9.376528689861087e+117,), (-2.1696969883753554e-292,), 
> (7.227411393136537e+206,), (-2.428999624265911e-293,), 
> (5.741383583382542e-14,), (-1.4882040107841963e+286,), 
> (2.1973064836362255e-159,), (0.028096279323357867,), 
> (8.475809563703283e-64,), (3.002803065141241e-139,), 
> (-1.1041009815645263e+203,), (1.8461539468514548e-225,), 
> (-5.620339412794757e-251,), (3.5103766991437114e-60,), 
> (2.4925669515657655e+165,), (3.217759099462207e+108,), 
> (-8.796717685143486e+203,), (2.037360925124577e+292,), 
> (-6.542279108216022e+206,), (-7.951172614280046e-74,), 
> (6.226527569272003e+152,), (-5.673977270111637e-84,), 
> (-1.0186016078084965e-281,), (1.7976931348623157e+308,), 
> (4.205809391029644e+137,), (-9.871721037428167e+119,), (None,), 
> (-1.6663254121185628e-256,), (1.0075153091760986e-236,), (-0.0,), (0.0,), 
> (1.7976931348623157e+308,), (4.3214483342777574e-117,), 
> (-7.973642629411105e-89,), (-1.1028137694801181e-297,), 
> (2.9000325280299273e-39,), (-1.077534929323113e-264,), 
> (-1.1847952892216515e+137,), (nan,), (7.849390806334983e+226,), 
> (-1.831402251805194e+65,), (-2.664533698035492e+203,), 
> (-2.2385155698231885e+285,), (-2.3016388448634844e-155,), 
> (-9.607772864590422e+217,), (3.437191836077251e+209,), 
> (1.9846569552093057e-137,), (-3.010452936419635e-233,), 
> (1.4309793775440402e-87,), (-2.9383643865423363e-103,), 
> (-4.696878567317712e-162,), (8.391630779050713e-135,), (nan,), 
> (-3.3885098786542755e-128,), (-4.5154178008513483e-122,), (nan,), (nan,), 
> (2.187766760184779e+306,), (7.679268835670585e+223,), 
> (6.3131466321042515e+153,), (1.779652973678931e+173,), 
> (9.247723870123388e-295,), (5.891823952773268e+98,), (inf,), 
> (1.9042708096454302e+195,), (-3.085825028509117e+74,), 
> (-1.9569489404314425e+128,), (2.0738138203216883e+201,), (inf,), 
> (2.5212410617263588e-282,), (-2.646144697462316e-35,), 
> (-3.468683249247593e-196,), (nan,), (None,), (nan,), 
> (1.822129180806602e-245,), (5.211702553315461e-259,), (-1.0,), 
> (-5.682293414619055e+46,), (-4.585039307326895e+166,), 
> (-5.936844510098297e-82,), (-5234708055733.116,), (4920675036.053339,), 
> (None,), (4.4501477170144023e-308,), (2.176024662699802e-210,), 
> (-5.046677974902737e+132,), (-5.490780063080251e-09,), 
> (1.703824427218836e-55,), (-1.1961155424160076e+102,), 
> (1.4403274475565667e+41,), (None,), (5.4470705929955455e-86,), 
> (5.120795466142678e-215,), (-9.01991342808203e+282,), 
> (4.051866849943636e-254,), (-3588518231990.927,), (-1.8891559842111865e+63,), 
> (3.4543959813437507e-304,), (-7.5907345602

[jira] [Resolved] (SPARK-45599) Percentile can produce a wrong answer if -0.0 and 0.0 are mixed in the dataset

2024-02-26 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-45599.
-
Fix Version/s: 3.5.1
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 45036
[https://github.com/apache/spark/pull/45036]

> Percentile can produce a wrong answer if -0.0 and 0.0 are mixed in the dataset
> --
>
> Key: SPARK-45599
> URL: https://issues.apache.org/jira/browse/SPARK-45599
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.1, 1.6.3, 3.3.0, 3.2.3, 3.5.0
>Reporter: Robert Joseph Evans
>Assignee: Nicholas Chammas
>Priority: Critical
>  Labels: correctness, pull-request-available
> Fix For: 3.5.1, 4.0.0
>
>
> I think this actually impacts all versions that have ever supported 
> percentile and it may impact other things because the bug is in OpenHashMap.
>  
> I am really surprised that we caught this bug because everything has to hit 
> just wrong to make it happen. in python/pyspark if you run
>  
> {code:python}
> from math import *
> from pyspark.sql.types import *
> data = [(1.779652973678931e+173,), (9.247723870123388e-295,), 
> (5.891823952773268e+98,), (inf,), (1.9042708096454302e+195,), 
> (-3.085825028509117e+74,), (-1.9569489404314425e+128,), 
> (2.0738138203216883e+201,), (inf,), (2.5212410617263588e-282,), 
> (-2.646144697462316e-35,), (-3.468683249247593e-196,), (nan,), (None,), 
> (nan,), (1.822129180806602e-245,), (5.211702553315461e-259,), (-1.0,), 
> (-5.682293414619055e+46,), (-4.585039307326895e+166,), 
> (-5.936844510098297e-82,), (-5234708055733.116,), (4920675036.053339,), 
> (None,), (4.4501477170144023e-308,), (2.176024662699802e-210,), 
> (-5.046677974902737e+132,), (-5.490780063080251e-09,), 
> (1.703824427218836e-55,), (-1.1961155424160076e+102,), 
> (1.4403274475565667e+41,), (None,), (5.4470705929955455e-86,), 
> (5.120795466142678e-215,), (-9.01991342808203e+282,), 
> (4.051866849943636e-254,), (-3588518231990.927,), (-1.8891559842111865e+63,), 
> (3.4543959813437507e-304,), (-7.590734560275502e-63,), 
> (9.376528689861087e+117,), (-2.1696969883753554e-292,), 
> (7.227411393136537e+206,), (-2.428999624265911e-293,), 
> (5.741383583382542e-14,), (-1.4882040107841963e+286,), 
> (2.1973064836362255e-159,), (0.028096279323357867,), 
> (8.475809563703283e-64,), (3.002803065141241e-139,), 
> (-1.1041009815645263e+203,), (1.8461539468514548e-225,), 
> (-5.620339412794757e-251,), (3.5103766991437114e-60,), 
> (2.4925669515657655e+165,), (3.217759099462207e+108,), 
> (-8.796717685143486e+203,), (2.037360925124577e+292,), 
> (-6.542279108216022e+206,), (-7.951172614280046e-74,), 
> (6.226527569272003e+152,), (-5.673977270111637e-84,), 
> (-1.0186016078084965e-281,), (1.7976931348623157e+308,), 
> (4.205809391029644e+137,), (-9.871721037428167e+119,), (None,), 
> (-1.6663254121185628e-256,), (1.0075153091760986e-236,), (-0.0,), (0.0,), 
> (1.7976931348623157e+308,), (4.3214483342777574e-117,), 
> (-7.973642629411105e-89,), (-1.1028137694801181e-297,), 
> (2.9000325280299273e-39,), (-1.077534929323113e-264,), 
> (-1.1847952892216515e+137,), (nan,), (7.849390806334983e+226,), 
> (-1.831402251805194e+65,), (-2.664533698035492e+203,), 
> (-2.2385155698231885e+285,), (-2.3016388448634844e-155,), 
> (-9.607772864590422e+217,), (3.437191836077251e+209,), 
> (1.9846569552093057e-137,), (-3.010452936419635e-233,), 
> (1.4309793775440402e-87,), (-2.9383643865423363e-103,), 
> (-4.696878567317712e-162,), (8.391630779050713e-135,), (nan,), 
> (-3.3885098786542755e-128,), (-4.5154178008513483e-122,), (nan,), (nan,), 
> (2.187766760184779e+306,), (7.679268835670585e+223,), 
> (6.3131466321042515e+153,), (1.779652973678931e+173,), 
> (9.247723870123388e-295,), (5.891823952773268e+98,), (inf,), 
> (1.9042708096454302e+195,), (-3.085825028509117e+74,), 
> (-1.9569489404314425e+128,), (2.0738138203216883e+201,), (inf,), 
> (2.5212410617263588e-282,), (-2.646144697462316e-35,), 
> (-3.468683249247593e-196,), (nan,), (None,), (nan,), 
> (1.822129180806602e-245,), (5.211702553315461e-259,), (-1.0,), 
> (-5.682293414619055e+46,), (-4.585039307326895e+166,), 
> (-5.936844510098297e-82,), (-5234708055733.116,), (4920675036.053339,), 
> (None,), (4.4501477170144023e-308,), (2.176024662699802e-210,), 
> (-5.046677974902737e+132,), (-5.490780063080251e-09,), 
> (1.703824427218836e-55,), (-1.1961155424160076e+102,), 
> (1.4403274475565667e+41,), (None,), (5.4470705929955455e-86,), 
> (5.12079

(spark) branch branch-3.5 updated: [SPARK-45599][CORE] Use object equality in OpenHashSet

2024-02-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 588a55d010fe [SPARK-45599][CORE] Use object equality in OpenHashSet
588a55d010fe is described below

commit 588a55d010fefda7a63cde3b616ac38728fe4cfe
Author: Nicholas Chammas 
AuthorDate: Mon Feb 26 16:03:30 2024 +0800

[SPARK-45599][CORE] Use object equality in OpenHashSet

### What changes were proposed in this pull request?

Change `OpenHashSet` to use object equality instead of cooperative equality 
when looking up keys.

### Why are the changes needed?

This brings the behavior of `OpenHashSet` more in line with the semantics 
of `java.util.HashSet`, and fixes its behavior when comparing values for which 
`equals` and `==` return different results, like 0.0/-0.0 and NaN/NaN.

For example, in certain cases where both 0.0 and -0.0 are provided as keys 
to the set, lookups of one or the other key may return the [wrong 
position][wrong] in the set. This leads to the bug described in SPARK-45599 and 
summarized in [this comment][1].

[wrong]: 
https://github.com/apache/spark/pull/45036/files#diff-894198a5fea34e5b7f07d0a4641eb09995315d5de3e0fded3743c15a3c8af405R277-R283
[1]: 
https://issues.apache.org/jira/browse/SPARK-45599?focusedCommentId=17806954=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17806954

### Does this PR introduce _any_ user-facing change?

Yes, it resolves the bug described in SPARK-45599.

`OpenHashSet` is used widely under the hood, including in:
- `OpenHashMap`, which itself backs:
- `TypedImperativeAggregate`
- aggregate functions like `percentile` and `mode`
- many algorithms in ML and MLlib
- `SQLOpenHashSet`, which backs array functions like `array_union` and 
`array_distinct`

However, the user-facing changes should be limited to the kind of edge case 
described in SPARK-45599.

### How was this patch tested?

New and existing unit tests. Of the new tests added in this PR, some simply 
validate that we have not changed existing SQL semantics, while others confirm 
that we have fixed the specific bug reported in SPARK-45599 along with any 
related incorrect behavior.

New tests failing on `master` but passing on this branch:
- [Handling 0.0 and -0.0 in 
`OpenHashSet`](https://github.com/apache/spark/pull/45036/files#diff-894198a5fea34e5b7f07d0a4641eb09995315d5de3e0fded3743c15a3c8af405R273)
- [Handling NaN in 
`OpenHashSet`](https://github.com/apache/spark/pull/45036/files#diff-894198a5fea34e5b7f07d0a4641eb09995315d5de3e0fded3743c15a3c8af405R302)
- [Handling 0.0 and -0.0 in 
`OpenHashMap`](https://github.com/apache/spark/pull/45036/files#diff-09400ec633b1f1322c5f7b39dc4e87073b0b0435b60b9cff93388053be5083b6R253)
- [Handling 0.0 and -0.0 when computing 
percentile](https://github.com/apache/spark/pull/45036/files#diff-bd3d5c79ede5675f4bf10d2efb313db893d57443d6d6d67b1f8766e6ce741271R1092)

New tests passing both on `master` and this branch:
- [Handling 0.0, -0.0, and NaN in 
`array_union`](https://github.com/apache/spark/pull/45036/files#diff-9e18a5ccf83ac94321e3a0ee8c5acf104c45734f3b35f1a0d4c15c4daa315ad5R793)
- [Handling 0.0, -0.0, and NaN in 
`array_distinct`](https://github.com/apache/spark/pull/45036/files#diff-9e18a5ccf83ac94321e3a0ee8c5acf104c45734f3b35f1a0d4c15c4daa315ad5R801)
- [Handling 0.0, -0.0, and NaN in `GROUP 
BY`](https://github.com/apache/spark/pull/45036/files#diff-496edb8b03201f078c3772ca81f7c7f80002acc11dff00b1d06d288b87855264R1107)
- [Normalizing -0 and 
-0.0](https://github.com/apache/spark/pull/45036/files#diff-4bdd04d06a2d88049dd5c8a67715c5566dd68a1c4ebffc689dc74b6b2e0b3b04R782)

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45036 from nchammas/SPARK-45599-plus-and-minus-zero.

Authored-by: Nicholas Chammas 
Signed-off-by: Wenchen Fan 
(cherry picked from commit fcc5dbc9b6c8a8e16dc2e0854f3eebc8758a5826)
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/util/collection/OpenHashSet.scala | 16 +++--
 .../spark/util/collection/OpenHashMapSuite.scala   | 30 +
 .../spark/util/collection/OpenHashSetSuite.scala   | 39 ++
 .../sql-tests/analyzer-results/ansi/array.sql.out  | 14 
 .../analyzer-results/ansi/literals.sql.out |  7 
 .../sql-tests/analyzer-results/array.sql.out   | 14 
 .../sql-tests/analyzer-results/group-by.sql.out| 19 +++
 .../sql-tests/analyzer-results/literals.sql.out|  7 
 .../src/test/resources/sql-tests/inputs/array.sql  |  4 +++
 .../test/resources/sql-tests/inputs/group-by.sql   | 15 +
 .../test/resources

(spark) branch master updated: [SPARK-45599][CORE] Use object equality in OpenHashSet

2024-02-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fcc5dbc9b6c8 [SPARK-45599][CORE] Use object equality in OpenHashSet
fcc5dbc9b6c8 is described below

commit fcc5dbc9b6c8a8e16dc2e0854f3eebc8758a5826
Author: Nicholas Chammas 
AuthorDate: Mon Feb 26 16:03:30 2024 +0800

[SPARK-45599][CORE] Use object equality in OpenHashSet

### What changes were proposed in this pull request?

Change `OpenHashSet` to use object equality instead of cooperative equality 
when looking up keys.

### Why are the changes needed?

This brings the behavior of `OpenHashSet` more in line with the semantics 
of `java.util.HashSet`, and fixes its behavior when comparing values for which 
`equals` and `==` return different results, like 0.0/-0.0 and NaN/NaN.

For example, in certain cases where both 0.0 and -0.0 are provided as keys 
to the set, lookups of one or the other key may return the [wrong 
position][wrong] in the set. This leads to the bug described in SPARK-45599 and 
summarized in [this comment][1].

[wrong]: 
https://github.com/apache/spark/pull/45036/files#diff-894198a5fea34e5b7f07d0a4641eb09995315d5de3e0fded3743c15a3c8af405R277-R283
[1]: 
https://issues.apache.org/jira/browse/SPARK-45599?focusedCommentId=17806954=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17806954

### Does this PR introduce _any_ user-facing change?

Yes, it resolves the bug described in SPARK-45599.

`OpenHashSet` is used widely under the hood, including in:
- `OpenHashMap`, which itself backs:
- `TypedImperativeAggregate`
- aggregate functions like `percentile` and `mode`
- many algorithms in ML and MLlib
- `SQLOpenHashSet`, which backs array functions like `array_union` and 
`array_distinct`

However, the user-facing changes should be limited to the kind of edge case 
described in SPARK-45599.

### How was this patch tested?

New and existing unit tests. Of the new tests added in this PR, some simply 
validate that we have not changed existing SQL semantics, while others confirm 
that we have fixed the specific bug reported in SPARK-45599 along with any 
related incorrect behavior.

New tests failing on `master` but passing on this branch:
- [Handling 0.0 and -0.0 in 
`OpenHashSet`](https://github.com/apache/spark/pull/45036/files#diff-894198a5fea34e5b7f07d0a4641eb09995315d5de3e0fded3743c15a3c8af405R273)
- [Handling NaN in 
`OpenHashSet`](https://github.com/apache/spark/pull/45036/files#diff-894198a5fea34e5b7f07d0a4641eb09995315d5de3e0fded3743c15a3c8af405R302)
- [Handling 0.0 and -0.0 in 
`OpenHashMap`](https://github.com/apache/spark/pull/45036/files#diff-09400ec633b1f1322c5f7b39dc4e87073b0b0435b60b9cff93388053be5083b6R253)
- [Handling 0.0 and -0.0 when computing 
percentile](https://github.com/apache/spark/pull/45036/files#diff-bd3d5c79ede5675f4bf10d2efb313db893d57443d6d6d67b1f8766e6ce741271R1092)

New tests passing both on `master` and this branch:
- [Handling 0.0, -0.0, and NaN in 
`array_union`](https://github.com/apache/spark/pull/45036/files#diff-9e18a5ccf83ac94321e3a0ee8c5acf104c45734f3b35f1a0d4c15c4daa315ad5R793)
- [Handling 0.0, -0.0, and NaN in 
`array_distinct`](https://github.com/apache/spark/pull/45036/files#diff-9e18a5ccf83ac94321e3a0ee8c5acf104c45734f3b35f1a0d4c15c4daa315ad5R801)
- [Handling 0.0, -0.0, and NaN in `GROUP 
BY`](https://github.com/apache/spark/pull/45036/files#diff-496edb8b03201f078c3772ca81f7c7f80002acc11dff00b1d06d288b87855264R1107)
- [Normalizing -0 and 
-0.0](https://github.com/apache/spark/pull/45036/files#diff-4bdd04d06a2d88049dd5c8a67715c5566dd68a1c4ebffc689dc74b6b2e0b3b04R782)

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45036 from nchammas/SPARK-45599-plus-and-minus-zero.

Authored-by: Nicholas Chammas 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/util/collection/OpenHashSet.scala | 16 +++--
 .../spark/util/collection/OpenHashMapSuite.scala   | 30 +
 .../spark/util/collection/OpenHashSetSuite.scala   | 39 ++
 .../sql-tests/analyzer-results/ansi/array.sql.out  | 14 
 .../analyzer-results/ansi/literals.sql.out |  7 
 .../sql-tests/analyzer-results/array.sql.out   | 14 
 .../sql-tests/analyzer-results/group-by.sql.out| 19 +++
 .../sql-tests/analyzer-results/literals.sql.out|  7 
 .../src/test/resources/sql-tests/inputs/array.sql  |  4 +++
 .../test/resources/sql-tests/inputs/group-by.sql   | 15 +
 .../test/resources/sql-tests/inputs/literals.sql   |  3 ++
 .../resources/sql-tests/results/ansi/array.sql.out | 16

[jira] [Assigned] (SPARK-47044) Add JDBC query to explain formatted command

2024-02-20 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-47044:
---

Assignee: Uros Stankovic

> Add JDBC query to explain formatted command
> ---
>
> Key: SPARK-47044
> URL: https://issues.apache.org/jira/browse/SPARK-47044
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Uros Stankovic
>Assignee: Uros Stankovic
>Priority: Major
>  Labels: pull-request-available
>
> Add generated JDBC query to EXPLAIN FORMATTED command when physical Scan node 
> should access to JDBC source to create RDD.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47044) Add JDBC query to explain formatted command

2024-02-20 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-47044.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45102
[https://github.com/apache/spark/pull/45102]

> Add JDBC query to explain formatted command
> ---
>
> Key: SPARK-47044
> URL: https://issues.apache.org/jira/browse/SPARK-47044
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Uros Stankovic
>Assignee: Uros Stankovic
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Add generated JDBC query to EXPLAIN FORMATTED command when physical Scan node 
> should access to JDBC source to create RDD.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-47044][SQL] Add executed query for JDBC external datasources to explain output

2024-02-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e6a3385e27fa [SPARK-47044][SQL] Add executed query for JDBC external 
datasources to explain output
e6a3385e27fa is described below

commit e6a3385e27fa95391433ea02fa053540fe101d40
Author: Uros Stankovic 
AuthorDate: Tue Feb 20 22:03:28 2024 +0800

[SPARK-47044][SQL] Add executed query for JDBC external datasources to 
explain output

### What changes were proposed in this pull request?
Add generated JDBC query to EXPLAIN FORMATTED command when physical Scan 
node should access to JDBC source to create RDD.

Output of Explain formatted with this change from newly added test.
```
== Physical Plan ==
* Project (2)
+- * Scan 
org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$14349389d  (1)

(1) Scan 
org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$14349389d  
[codegen id : 1]
Output [1]: [MAX(ID)#x]
Arguments: [MAX(ID)#x], [StructField(MAX(ID),IntegerType,true)], 
PushedDownOperators(Some(org.apache.spark.sql.connector.expressions.aggregate.Aggregation647d3279),None,None,None,List(),ArraySeq(ID
 IS NOT NULL, ID > 1)), JDBCRDD[0] at $anonfun$executePhase$2 at 
LexicalThreadLocal.scala:63, 
org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$14349389d, 
Statistics(sizeInBytes=8.0 EiB, ColumnStat: N/A)
External engine query: SELECT MAX("ID") FROM "test"."people"  WHERE ("ID" 
IS NOT NULL) AND ("ID" > 1)

(2) Project [codegen id : 1]
Output [1]: [MAX(ID)#x AS max(id)#x]
Input [1]: [MAX(ID)#x]
```

### Why are the changes needed?
This command will allow customers to see which query text is sent to 
external JDBC sources.

### Does this PR introduce _any_ user-facing change?
Yes
Customer will have another field in EXPLAIN FORMATTED command for 
RowDataSourceScanExec node.

### How was this patch tested?
Tested using JDBC V2 suite by new unit test.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45102 from urosstan-db/add-sql-query-for-external-datasources.
    
    Authored-by: Uros Stankovic 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/catalyst/trees/TreeNode.scala |  8 ++--
 .../spark/sql/execution/DataSourceScanExec.scala   | 10 
 .../datasources/ExternalEngineDatasourceRDD.scala  | 26 ++
 .../sql/execution/datasources/jdbc/JDBCRDD.scala   | 56 --
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala|  7 +++
 5 files changed, 78 insertions(+), 29 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index dbacb833ef59..10e2718da833 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -1000,12 +1000,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
 
 val str = if (verbose) {
   if (addSuffix) verboseStringWithSuffix(maxFields) else 
verboseString(maxFields)
+} else if (printNodeId) {
+  simpleStringWithNodeId()
 } else {
-  if (printNodeId) {
-simpleStringWithNodeId()
-  } else {
-simpleString(maxFields)
-  }
+  simpleString(maxFields)
 }
 append(prefix)
 append(str)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index ec265f4eaea4..474d65a251ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -127,6 +127,16 @@ case class RowDataSourceScanExec(
 }
   }
 
+  override def verboseStringWithOperatorId(): String = {
+super.verboseStringWithOperatorId() + (rdd match {
+  case externalEngineDatasourceRdd: ExternalEngineDatasourceRDD =>
+"External engine query: " +
+  externalEngineDatasourceRdd.getExternalEngineQuery +
+  System.lineSeparator()
+  case _ => ""
+})
+  }
+
   protected override def doExecute(): RDD[InternalRow] = {
 val numOutputRows = longMetric("numOutputRows")
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ExternalEngineDatasourceRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ExternalEngineDatasourceRDD.scala
new file mode 100644
index ..14ca8

[jira] [Resolved] (SPARK-45789) Support DESCRIBE TABLE for clustering columns

2024-02-19 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-45789.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45077
[https://github.com/apache/spark/pull/45077]

> Support DESCRIBE TABLE for clustering columns
> -
>
> Key: SPARK-45789
> URL: https://issues.apache.org/jira/browse/SPARK-45789
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Terry Kim
>Assignee: Terry Kim
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-45789) Support DESCRIBE TABLE for clustering columns

2024-02-19 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-45789:
---

Assignee: Terry Kim

> Support DESCRIBE TABLE for clustering columns
> -
>
> Key: SPARK-45789
> URL: https://issues.apache.org/jira/browse/SPARK-45789
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Terry Kim
>Assignee: Terry Kim
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-45789][SQL] Support DESCRIBE TABLE for clustering columns

2024-02-19 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f0f35c8b1c8f [SPARK-45789][SQL] Support DESCRIBE TABLE for clustering 
columns
f0f35c8b1c8f is described below

commit f0f35c8b1c8f3b1d7f7c2b79945eb29ffb3c8f9a
Author: Terry Kim 
AuthorDate: Tue Feb 20 14:49:49 2024 +0800

[SPARK-45789][SQL] Support DESCRIBE TABLE for clustering columns

### What changes were proposed in this pull request?
This PR proposes to add clustering column info as the output of `DESCRIBE 
TABLE`.

### Why are the changes needed?

Currently, it's not easy to retrieve clustering column info; you can do it 
via catalog APIs.

### Does this PR introduce _any_ user-facing change?

Yes. Now, when you run `DESCRIBE TABLE` on clustered tables, you will see 
the "Clustering Information" as follows:

```
CREATE TABLE tbl (col1 STRING, col2 INT) using parquet CLUSTER BY (col1, 
col2);
DESC tbl;

++-+---+
|col_name|data_type|comment|
++-+---+
|col1|string   |NULL   |
|col2|int  |NULL   |
|# Clustering Information| |   |
|# col_name  |data_type|comment|
|col1|string   |NULL   |
|col2|int  |NULL   |
++-+---+
```

### How was this patch tested?

Added new unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #45077 from imback82/describe_clustered_table.

Authored-by: Terry Kim 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/catalog/interface.scala |  8 -
 .../spark/sql/execution/command/tables.scala   | 24 +++
 .../datasources/v2/DescribeTableExec.scala | 36 --
 .../execution/command/DescribeTableSuiteBase.scala | 21 +
 4 files changed, 85 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 0a1a40a88522..10428877ba8d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -43,7 +43,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUti
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.connector.catalog.CatalogManager
-import org.apache.spark.sql.connector.expressions.{FieldReference, 
NamedReference}
+import org.apache.spark.sql.connector.expressions.{ClusterByTransform, 
FieldReference, NamedReference, Transform}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -224,6 +224,12 @@ object ClusterBySpec {
 
 ClusterBySpec(normalizedColumns)
   }
+
+  def extractClusterBySpec(transforms: Seq[Transform]): Option[ClusterBySpec] 
= {
+transforms.collectFirst {
+  case ClusterByTransform(columnNames) => ClusterBySpec(columnNames)
+}
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 2f8fca7cfd73..fa288fd94ea9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -642,6 +642,7 @@ case class DescribeTableCommand(
   }
 
   describePartitionInfo(metadata, result)
+  describeClusteringInfo(metadata, result)
 
   if (partitionSpec.nonEmpty) {
 // Outputs the partition-specific info for the DDL command:
@@ -667,6 +668,29 @@ case class DescribeTableCommand(
 }
   }
 
+  private def describeClusteringInfo(
+  table: CatalogTable,
+  buffer: ArrayBuffer[Row]): Unit = {
+table.clusterBySpec.foreach { clusterBySpec =>
+  append(buffer, "# Clustering Information", "", "")
+  append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
+  clusterBySpec.columnNames.map { fieldNames =>
+val nestedField = 
table.schema.findNestedField(fieldNames.fieldNames.toIndexedSeq)
+assert(nestedField.isDefined,
+  "The clustering column " +
+s"${fieldNames.fieldNames.map(quoteIfNeeded).mkString(".")

Re: [VOTE] Release Apache Spark 3.5.1 (RC2)

2024-02-19 Thread Wenchen Fan
+1, thanks for making the release!

On Sat, Feb 17, 2024 at 3:54 AM Sean Owen  wrote:

> Yeah let's get that fix in, but it seems to be a minor test only issue so
> should not block release.
>
> On Fri, Feb 16, 2024, 9:30 AM yangjie01  wrote:
>
>> Very sorry. When I was fixing `SPARK-45242 (
>> https://github.com/apache/spark/pull/43594)`
>> , I noticed that its
>> `Affects Version` and `Fix Version` of SPARK-45242 were both 4.0, and I
>> didn't realize that it had also been merged into branch-3.5, so I didn't
>> advocate for SPARK-45357 to be backported to branch-3.5.
>>
>>
>>
>> As far as I know, the condition to trigger this test failure is: when
>> using Maven to test the `connect` module, if  `sparkTestRelation` in
>> `SparkConnectProtoSuite` is not the first `DataFrame` to be initialized,
>> then the `id` of `sparkTestRelation` will no longer be 0. So, I think this
>> is indeed related to the order in which Maven executes the test cases in
>> the `connect` module.
>>
>>
>>
>> I have submitted a backport PR
>>  to branch-3.5, and if
>> necessary, we can merge it to fix this test issue.
>>
>>
>>
>> Jie Yang
>>
>>
>>
>> *发件人**: *Jungtaek Lim 
>> *日期**: *2024年2月16日 星期五 22:15
>> *收件人**: *Sean Owen , Rui Wang 
>> *抄送**: *dev 
>> *主题**: *Re: [VOTE] Release Apache Spark 3.5.1 (RC2)
>>
>>
>>
>> I traced back relevant changes and got a sense of what happened.
>>
>>
>>
>> Yangjie figured out the issue via link
>> .
>> It's a tricky issue according to the comments from Yangjie - the test is
>> dependent on ordering of execution for test suites. He said it does not
>> fail in sbt, hence CI build couldn't catch it.
>>
>> He fixed it via link
>> ,
>> but we missed that the offending commit was also ported back to 3.5 as
>> well, hence the fix wasn't ported back to 3.5.
>>
>>
>>
>> Surprisingly, I can't reproduce locally even with maven. In my attempt to
>> reproduce, SparkConnectProtoSuite was executed at
>> third, SparkConnectStreamingQueryCacheSuite, and ExecuteEventsManagerSuite,
>> and then SparkConnectProtoSuite. Maybe very specific to the environment,
>> not just maven? My env: MBP M1 pro chip, MacOS 14.3.1, Openjdk 17.0.9. I
>> used build/mvn (Maven 3.8.8).
>>
>>
>>
>> I'm not 100% sure this is something we should fail the release as it's a
>> test only and sounds very environment dependent, but I'll respect your call
>> on vote.
>>
>>
>>
>> Btw, looks like Rui also made a relevant fix via link
>> 
>>  (not
>> to fix the failing test but to fix other issues), but this also wasn't
>> ported back to 3.5. @Rui Wang  Do you think this
>> is a regression issue and warrants a new RC?
>>
>>
>>
>>
>>
>> On Fri, Feb 16, 2024 at 11:38 AM Sean Owen  wrote:
>>
>> Is anyone seeing this Spark Connect test failure? then again, I have some
>> weird issue with this env that always fails 1 or 2 tests that nobody else
>> can replicate.
>>
>>
>>
>> - Test observe *** FAILED ***
>>   == FAIL: Plans do not match ===
>>   !CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS
>> max_val#0, sum(id#0) AS sum(id)#0L], 0   CollectMetrics my_metric,
>> [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L],
>> 44
>>+- LocalRelation , [id#0, name#0]
>>   +- LocalRelation , [id#0, name#0]
>> (PlanTest.scala:179)
>>
>>
>>
>> On Thu, Feb 15, 2024 at 1:34 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>> DISCLAIMER: RC for Apache Spark 3.5.1 starts with RC2 as I lately figured
>> out doc generation issue after tagging RC1.
>>
>>
>>
>> Please vote on releasing the following candidate as Apache Spark version
>> 3.5.1.
>>
>> The vote is open until February 18th 9AM (PST) and passes if a majority
>> +1 PMC votes are cast, with
>> a minimum of 3 +1 votes.
>>
>> [ ] +1 Release this package as Apache Spark 3.5.1
>> [ ] -1 Do not release this package because ...
>>
>> To learn more about Apache Spark, please see https://spark.apache.org/
>> 
>>
>> The tag to be voted on is v3.5.1-rc2 (commit
>> fd86f85e181fc2dc0f50a096855acf83a6cc5d9c):
>> https://github.com/apache/spark/tree/v3.5.1-rc2
>> 
>>
>> The release files, including signatures, digests, etc. can be found at:
>> https://dist.apache.org/repos/dist/dev/spark/v3.5.1-rc2-bin/
>> 

[jira] [Updated] (SPARK-47071) inline With expression if it contains special expression

2024-02-15 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan updated SPARK-47071:

Summary: inline With expression if it contains special expression  (was: 
inline With expression if it contains aggregate/window expression)

> inline With expression if it contains special expression
> 
>
> Key: SPARK-47071
> URL: https://issues.apache.org/jira/browse/SPARK-47071
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Wenchen Fan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47071) inline With expression if it contains aggregate/window expression

2024-02-15 Thread Wenchen Fan (Jira)
Wenchen Fan created SPARK-47071:
---

 Summary: inline With expression if it contains aggregate/window 
expression
 Key: SPARK-47071
 URL: https://issues.apache.org/jira/browse/SPARK-47071
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 4.0.0
Reporter: Wenchen Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47059) attach error context for ALTER COLUMN v1 command

2024-02-15 Thread Wenchen Fan (Jira)
Wenchen Fan created SPARK-47059:
---

 Summary: attach error context for ALTER COLUMN v1 command
 Key: SPARK-47059
 URL: https://issues.apache.org/jira/browse/SPARK-47059
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 4.0.0
Reporter: Wenchen Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-39910) DataFrameReader API cannot read files from hadoop archives (.har)

2024-02-08 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-39910:
---

Assignee: Christophe Préaud

> DataFrameReader API cannot read files from hadoop archives (.har)
> -
>
> Key: SPARK-39910
> URL: https://issues.apache.org/jira/browse/SPARK-39910
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.3, 3.1.3, 3.3.0, 3.2.2
>Reporter: Christophe Préaud
>Assignee: Christophe Préaud
>Priority: Minor
>  Labels: DataFrameReader, pull-request-available
>
> Reading a file from an hadoop archive using the DataFrameReader API returns 
> an empty Dataset:
> {code:java}
> scala> val df = 
> spark.read.textFile("har:///user/preaudc/logs/lead/jp/2022/202207.har/20220719")
> df: org.apache.spark.sql.Dataset[String] = [value: string]
> scala> df.count
> res7: Long = 0 {code}
>  
> On the other hand, reading the same file, from the same hadoop archive, but 
> using the RDD API yields the correct result:
> {code:java}
> scala> val df = 
> sc.textFile("har:///user/preaudc/logs/lead/jp/2022/202207.har/20220719").toDF("value")
> df: org.apache.spark.sql.DataFrame = [value: string]
> scala> df.count
> res8: Long = 5589 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-39910) DataFrameReader API cannot read files from hadoop archives (.har)

2024-02-08 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-39910.
-
Fix Version/s: 3.5.1
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 43463
[https://github.com/apache/spark/pull/43463]

> DataFrameReader API cannot read files from hadoop archives (.har)
> -
>
> Key: SPARK-39910
> URL: https://issues.apache.org/jira/browse/SPARK-39910
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.3, 3.1.3, 3.3.0, 3.2.2
>Reporter: Christophe Préaud
>Assignee: Christophe Préaud
>Priority: Minor
>  Labels: DataFrameReader, pull-request-available
> Fix For: 3.5.1, 4.0.0
>
>
> Reading a file from an hadoop archive using the DataFrameReader API returns 
> an empty Dataset:
> {code:java}
> scala> val df = 
> spark.read.textFile("har:///user/preaudc/logs/lead/jp/2022/202207.har/20220719")
> df: org.apache.spark.sql.Dataset[String] = [value: string]
> scala> df.count
> res7: Long = 0 {code}
>  
> On the other hand, reading the same file, from the same hadoop archive, but 
> using the RDD API yields the correct result:
> {code:java}
> scala> val df = 
> sc.textFile("har:///user/preaudc/logs/lead/jp/2022/202207.har/20220719").toDF("value")
> df: org.apache.spark.sql.DataFrame = [value: string]
> scala> df.count
> res8: Long = 5589 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-39910][SQL] Delegate path qualification to filesystem during DataSource file path globbing

2024-02-08 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 7658f77a613c [SPARK-39910][SQL] Delegate path qualification to 
filesystem during DataSource file path globbing
7658f77a613c is described below

commit 7658f77a613c91364c4b6c986e1861c7bd5487db
Author: Tigran Manasyan 
AuthorDate: Thu Feb 8 20:29:09 2024 +0800

[SPARK-39910][SQL] Delegate path qualification to filesystem during 
DataSource file path globbing

In current version `DataSource#checkAndGlobPathIfNecessary` qualifies paths 
via `Path#makeQualified` and `PartitioningAwareFileIndex` qualifies via 
`FileSystem#makeQualified`. Most `FileSystem` implementations simply delegate 
to `Path#makeQualified`, but others, like `HarFileSystem` contain fs-specific 
logic, that can produce different result. Such inconsistencies can lead to a 
situation, when spark can't find partitions of the source file, because 
qualified paths, built by `Path` and [...]

Allow users to read files from hadoop archives (.har) using DataFrameReader 
API

No

New tests were added in `DataSourceSuite` and `DataFrameReaderWriterSuite`

No

Closes #43463 from tigrulya-exe/SPARK-39910-use-fs-path-qualification.

Authored-by: Tigran Manasyan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit b7edc5fac0f4e479cbc869d54a9490c553ba2613)
Signed-off-by: Wenchen Fan 
---
 dev/.rat-excludes | 1 +
 .../org/apache/spark/sql/execution/datasources/DataSource.scala   | 2 +-
 sql/core/src/test/resources/test-data/test-archive.har/_index | 2 ++
 .../src/test/resources/test-data/test-archive.har/_masterindex| 2 ++
 sql/core/src/test/resources/test-data/test-archive.har/part-0 | 3 +++
 .../apache/spark/sql/execution/datasources/DataSourceSuite.scala  | 4 
 .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala| 8 
 7 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 16e0e3e30c9e..6bf840cee283 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -145,3 +145,4 @@ empty.proto
 .*\.proto.bin
 LimitedInputStream.java
 TimSort.java
+.*\.har
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 94dd3bc0bd63..2e24087d507b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -722,7 +722,7 @@ object DataSource extends Logging {
 val qualifiedPaths = pathStrings.map { pathString =>
   val path = new Path(pathString)
   val fs = path.getFileSystem(hadoopConf)
-  path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+  fs.makeQualified(path)
 }
 
 // Split the paths into glob and non glob paths, because we don't need to 
do an existence check
diff --git a/sql/core/src/test/resources/test-data/test-archive.har/_index 
b/sql/core/src/test/resources/test-data/test-archive.har/_index
new file mode 100644
index ..b7ae3ef9c5a4
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/test-archive.har/_index
@@ -0,0 +1,2 @@
+%2F dir 1707380620211+493+tigrulya+hadoop 0 0 test.csv 
+%2Ftest.csv file part-0 0 6 1707380620197+420+tigrulya+hadoop 
diff --git 
a/sql/core/src/test/resources/test-data/test-archive.har/_masterindex 
b/sql/core/src/test/resources/test-data/test-archive.har/_masterindex
new file mode 100644
index ..4192a9597299
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/test-archive.har/_masterindex
@@ -0,0 +1,2 @@
+3 
+0 1948547033 0 119 
diff --git a/sql/core/src/test/resources/test-data/test-archive.har/part-0 
b/sql/core/src/test/resources/test-data/test-archive.har/part-0
new file mode 100644
index ..01e79c32a8c9
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/test-archive.har/part-0
@@ -0,0 +1,3 @@
+1
+2
+3
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
index 06e570cb016b..90b341ae1f2c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.net.URI
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 import org.scalatest.PrivateMethodTester
@@ -214,4 +216,6 @@ class MockFileSystem exte

(spark) branch master updated: [SPARK-39910][SQL] Delegate path qualification to filesystem during DataSource file path globbing

2024-02-08 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b7edc5fac0f4 [SPARK-39910][SQL] Delegate path qualification to 
filesystem during DataSource file path globbing
b7edc5fac0f4 is described below

commit b7edc5fac0f4e479cbc869d54a9490c553ba2613
Author: Tigran Manasyan 
AuthorDate: Thu Feb 8 20:29:09 2024 +0800

[SPARK-39910][SQL] Delegate path qualification to filesystem during 
DataSource file path globbing

### What changes were proposed in this pull request?

In current version `DataSource#checkAndGlobPathIfNecessary` qualifies paths 
via `Path#makeQualified` and `PartitioningAwareFileIndex` qualifies via 
`FileSystem#makeQualified`. Most `FileSystem` implementations simply delegate 
to `Path#makeQualified`, but others, like `HarFileSystem` contain fs-specific 
logic, that can produce different result. Such inconsistencies can lead to a 
situation, when spark can't find partitions of the source file, because 
qualified paths, built by `Path` and [...]

### Why are the changes needed?

Allow users to read files from hadoop archives (.har) using DataFrameReader 
API

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New tests were added in `DataSourceSuite` and `DataFrameReaderWriterSuite`

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43463 from tigrulya-exe/SPARK-39910-use-fs-path-qualification.

Authored-by: Tigran Manasyan 
Signed-off-by: Wenchen Fan 
---
 dev/.rat-excludes | 1 +
 .../org/apache/spark/sql/execution/datasources/DataSource.scala   | 2 +-
 sql/core/src/test/resources/test-data/test-archive.har/_index | 2 ++
 .../src/test/resources/test-data/test-archive.har/_masterindex| 2 ++
 sql/core/src/test/resources/test-data/test-archive.har/part-0 | 3 +++
 .../apache/spark/sql/execution/datasources/DataSourceSuite.scala  | 4 
 .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala| 8 
 7 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 4cf5deb81192..8bad50951a78 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -138,3 +138,4 @@ people.xml
 ui-test/package.json
 ui-test/package-lock.json
 core/src/main/resources/org/apache/spark/ui/static/package.json
+.*\.har
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 3e03fd652f18..837cfb0de092 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -760,7 +760,7 @@ object DataSource extends Logging {
 val qualifiedPaths = pathStrings.map { pathString =>
   val path = new Path(pathString)
   val fs = path.getFileSystem(hadoopConf)
-  path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+  fs.makeQualified(path)
 }
 
 // Split the paths into glob and non glob paths, because we don't need to 
do an existence check
diff --git a/sql/core/src/test/resources/test-data/test-archive.har/_index 
b/sql/core/src/test/resources/test-data/test-archive.har/_index
new file mode 100644
index ..b7ae3ef9c5a4
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/test-archive.har/_index
@@ -0,0 +1,2 @@
+%2F dir 1707380620211+493+tigrulya+hadoop 0 0 test.csv 
+%2Ftest.csv file part-0 0 6 1707380620197+420+tigrulya+hadoop 
diff --git 
a/sql/core/src/test/resources/test-data/test-archive.har/_masterindex 
b/sql/core/src/test/resources/test-data/test-archive.har/_masterindex
new file mode 100644
index ..4192a9597299
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/test-archive.har/_masterindex
@@ -0,0 +1,2 @@
+3 
+0 1948547033 0 119 
diff --git a/sql/core/src/test/resources/test-data/test-archive.har/part-0 
b/sql/core/src/test/resources/test-data/test-archive.har/part-0
new file mode 100644
index ..01e79c32a8c9
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/test-archive.har/part-0
@@ -0,0 +1,3 @@
+1
+2
+3
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
index 06e570cb016b..90b341ae1f2c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasour

[jira] [Resolved] (SPARK-46999) ExpressionWithUnresolvedIdentifier should include other expressions in the expression tree

2024-02-08 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46999.
-
Fix Version/s: 4.0.0
 Assignee: Wenchen Fan
   Resolution: Fixed

> ExpressionWithUnresolvedIdentifier should include other expressions in the 
> expression tree
> --
>
> Key: SPARK-46999
> URL: https://issues.apache.org/jira/browse/SPARK-46999
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-46999][SQL] ExpressionWithUnresolvedIdentifier should include other expressions in the expression tree

2024-02-08 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b345b23b76c4 [SPARK-46999][SQL] ExpressionWithUnresolvedIdentifier 
should include other expressions in the expression tree
b345b23b76c4 is described below

commit b345b23b76c4f9a334a115b7397d1adcc6bef185
Author: Wenchen Fan 
AuthorDate: Thu Feb 8 19:31:49 2024 +0800

[SPARK-46999][SQL] ExpressionWithUnresolvedIdentifier should include other 
expressions in the expression tree

### What changes were proposed in this pull request?

The plan/expression tree will be expanded during analysis, due to our 
implementation of the IDENTIFIER clause: we keep a lambda function in the 
plan/expression node to lazily return a new plan/expression.

This is usually fine, but doesn't work well with query parameter binding, 
which needs to see the entire plan/expression tree and bind all parameters at 
once. The feature EXECUTE IMMEDIATE also needs to see the entire plan tree to 
determine if it's a `PosParameterizedQuery` or `NameParameterizedQuery`.

This PR fixes the problem for `ExpressionWithUnresolvedIdentifier`, to make 
the lambda function only return the expression node that needs the identifier, 
and other expressions should be kept in `ExpressionWithUnresolvedIdentifier` 
and passed to the lambda function later.

`PlanWithUnresolvedIdentifier` will be fixed in a similar way in another PR.

### Why are the changes needed?

To make IDENTIFIER clause work with query parameter binding

### Does this PR introduce _any_ user-facing change?

Yes, certain queries can run now while they failed before. See the test

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #45058 from cloud-fan/param.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../analysis/ResolveIdentifierClause.scala |  2 +-
 .../spark/sql/catalyst/analysis/unresolved.scala   | 22 ++
 .../spark/sql/catalyst/parser/AstBuilder.scala | 35 --
 .../sql/catalyst/analysis/AnalysisSuite.scala  |  4 +--
 .../org/apache/spark/sql/ParametersSuite.scala | 10 +++
 5 files changed, 49 insertions(+), 24 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
index e0d3e5629ef8..ced7123dfcc1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
@@ -36,7 +36,7 @@ object ResolveIdentifierClause extends Rule[LogicalPlan] with 
AliasHelper with E
 case other =>
   
other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER))
 {
 case e: ExpressionWithUnresolvedIdentifier if 
e.identifierExpr.resolved =>
-  e.exprBuilder.apply(evalIdentifierExpr(e.identifierExpr))
+  e.exprBuilder.apply(evalIdentifierExpr(e.identifierExpr), 
e.otherExprs)
   }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 63d4cfeb83fe..7a3cc4bc8e83 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -68,18 +68,30 @@ case class PlanWithUnresolvedIdentifier(
 /**
  * An expression placeholder that holds the identifier clause string 
expression. It will be
  * replaced by the actual expression with the evaluated identifier string.
+ *
+ * Note, the `exprBuilder` is a lambda and may hide other expressions from the 
expression tree. To
+ * avoid it, this placeholder has a field to hold other expressions, so that 
they can be properly
+ * transformed by catalyst rules.
  */
 case class ExpressionWithUnresolvedIdentifier(
 identifierExpr: Expression,
-exprBuilder: Seq[String] => Expression)
-  extends UnaryExpression with Unevaluable {
+otherExprs: Seq[Expression],
+exprBuilder: (Seq[String], Seq[Expression]) => Expression)
+  extends Expression with Unevaluable {
+
+  def this(identifierExpr: Expression, exprBuilder: Seq[String] => Expression) 
= {
+this(identifierExpr, Nil, (ident, _) => exprBuilder(ident))
+  }
+
   override lazy val resolved = false
-  override def child: Expression = identifierExpr
+  override def children: Seq[Expression] = identifierExpr +: otherExprs
   override def 

[jira] [Resolved] (SPARK-46993) Allow session variables in more places such as from_json for schema

2024-02-08 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46993.
-
Fix Version/s: 4.0.0
 Assignee: Serge Rielau
   Resolution: Fixed

> Allow session variables in more places such as from_json for schema
> ---
>
> Key: SPARK-46993
> URL: https://issues.apache.org/jira/browse/SPARK-46993
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.4.2
>Reporter: Serge Rielau
>Assignee: Serge Rielau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> It appears we do not allow session variables to provide a schema for 
> from_json().
> This is likely a generic restriction re constant folding.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-46993][SQL] Fix constant folding for session variables

2024-02-08 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3d25525d876b [SPARK-46993][SQL] Fix constant folding for session 
variables
3d25525d876b is described below

commit 3d25525d876be9c5f3bd2dce917cb6bec10fb3e9
Author: Serge Rielau 
AuthorDate: Thu Feb 8 16:24:03 2024 +0800

[SPARK-46993][SQL] Fix constant folding for session variables

### What changes were proposed in this pull request?

Remove the unconditional Alias node generation when resolving a variable 
reference.

### Why are the changes needed?

An Alias that is not at the top level of an expression blocks constant 
folding.
Constant folding in turn is a requirement for variables to be usable as an 
argument to numerous functions, such as from_json().
It also has performance implications.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing regression tests in sql-session-variables.sql, added a test to 
validate the fix.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #45059 from 
srielau/SPARK-46993-Fix-constant-folding-for-session-variables.

Authored-by: Serge Rielau 
Signed-off-by: Wenchen Fan 
---
 .../catalyst/analysis/ColumnResolutionHelper.scala | 35 
 .../analyzer-results/sql-session-variables.sql.out | 43 
 .../sql-tests/inputs/sql-session-variables.sql |  6 +++
 .../results/sql-session-variables.sql.out  | 46 ++
 4 files changed, 109 insertions(+), 21 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
index 2472705d2f54..8ea50e2ceb65 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
@@ -312,14 +312,35 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
   }.map(e => Alias(e, nameParts.last)())
 }
 
-e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, 
TEMP_RESOLVED_COLUMN)) {
-  case u: UnresolvedAttribute =>
-resolve(u.nameParts).getOrElse(u)
-  // Re-resolves `TempResolvedColumn` as variable references if it has 
tried to be resolved with
-  // Aggregate but failed.
-  case t: TempResolvedColumn if t.hasTried =>
-resolve(t.nameParts).getOrElse(t)
+def innerResolve(e: Expression, isTopLevel: Boolean): Expression = 
withOrigin(e.origin) {
+  if (e.resolved || !e.containsAnyPattern(UNRESOLVED_ATTRIBUTE, 
TEMP_RESOLVED_COLUMN)) return e
+  val resolved = e match {
+case u @ UnresolvedAttribute(nameParts) =>
+  val result = withPosition(u) {
+resolve(nameParts).getOrElse(u) match {
+  // We trim unnecessary alias here. Note that, we cannot trim the 
alias at top-level,
+  case Alias(child, _) if !isTopLevel => child
+  case other => other
+}
+  }
+  result
+
+// Re-resolves `TempResolvedColumn` as variable references if it has 
tried to be
+// resolved with Aggregate but failed.
+case t: TempResolvedColumn if t.hasTried => withPosition(t) {
+  resolve(t.nameParts).getOrElse(t) match {
+case _: UnresolvedAttribute => t
+case other => other
+  }
+}
+
+case _ => e.mapChildren(innerResolve(_, isTopLevel = false))
+  }
+  resolved.copyTagsFrom(e)
+  resolved
 }
+
+innerResolve(e, isTopLevel = true)
   }
 
   // Resolves `UnresolvedAttribute` to `TempResolvedColumn` via 
`plan.child.output` if plan is an
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/sql-session-variables.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/sql-session-variables.sql.out
index e75c7946ef76..6a6ffe85ad59 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/sql-session-variables.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/sql-session-variables.sql.out
@@ -74,7 +74,7 @@ CreateVariable defaultvalueexpression(null, null), false
 -- !query
 SELECT 'Expect: INT, NULL', typeof(var1), var1
 -- !query analysis
-Project [Expect: INT, NULL AS Expect: INT, NULL#x, 
typeof(variablereference(system.session.var1=CAST(NULL AS INT))) AS 
typeof(variablereference(system.session.var1=CAST(NULL AS INT)) AS var1)#x, 
variablereference(system.session.var1=CAST(NULL AS INT)) AS var1#x]
+Project [Expect: INT, NULL A

[jira] [Assigned] (SPARK-46922) Do not wrap runtime user-facing errors

2024-02-07 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46922:
---

Assignee: Wenchen Fan

> Do not wrap runtime user-facing errors
> --
>
> Key: SPARK-46922
> URL: https://issues.apache.org/jira/browse/SPARK-46922
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46922) Do not wrap runtime user-facing errors

2024-02-07 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46922.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44953
[https://github.com/apache/spark/pull/44953]

> Do not wrap runtime user-facing errors
> --
>
> Key: SPARK-46922
> URL: https://issues.apache.org/jira/browse/SPARK-46922
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46999) ExpressionWithUnresolvedIdentifier should include other expressions in the expression tree

2024-02-07 Thread Wenchen Fan (Jira)
Wenchen Fan created SPARK-46999:
---

 Summary: ExpressionWithUnresolvedIdentifier should include other 
expressions in the expression tree
 Key: SPARK-46999
 URL: https://issues.apache.org/jira/browse/SPARK-46999
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.4.0
Reporter: Wenchen Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated (dc73a8d7e96e -> becc04a9b6e2)

2024-02-07 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from dc73a8d7e96e [SPARK-46526][SQL] Support LIMIT over correlated 
subqueries where predicates only reference outer table
 add becc04a9b6e2 [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL 
cached plan

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/dataframe.py|  6 ++--
 .../org/apache/spark/sql/internal/SQLConf.scala|  6 ++--
 .../apache/spark/sql/execution/CacheManager.scala  | 34 +++---
 .../adaptive/InsertAdaptiveSparkPlan.scala |  5 +---
 .../org/apache/spark/sql/CachedTableSuite.scala| 33 -
 .../adaptive/AdaptiveQueryExecSuite.scala  | 17 +++
 6 files changed, 53 insertions(+), 48 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[jira] [Resolved] (SPARK-46526) Limit over certain correlated subqueries results in Nosuchelement exception

2024-02-06 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46526.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44514
[https://github.com/apache/spark/pull/44514]

> Limit over certain correlated subqueries results in Nosuchelement exception
> ---
>
> Key: SPARK-46526
> URL: https://issues.apache.org/jira/browse/SPARK-46526
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Andrey Gubichev
>Assignee: Andrey Gubichev
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> The types of queries that result in errors are:
>  * have LIMIT in the subquery
>  * predicate with correlated references does not depend on the inner query 
> (references exclusively outer table).
> For example:
> {code:java}
> SELECT COUNT(DISTINCT(t1a))
> FROM t1
> WHERE t1d IN (SELECT t2d
>   FROM   t2
>   WHERE t1a IS NOT NULL
>   LIMIT 10);
>  {code}
> Here, WHERE t1a IS NOT NULL can be conceptually lifted to the join that 
> connects inner and outer query. 
> Currently, this query results in an error ("no such element exception").



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-46526][SQL] Support LIMIT over correlated subqueries where predicates only reference outer table

2024-02-06 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new dc73a8d7e96e [SPARK-46526][SQL] Support LIMIT over correlated 
subqueries where predicates only reference outer table
dc73a8d7e96e is described below

commit dc73a8d7e96ead55053096971c838908b7c90527
Author: Andrey Gubichev 
AuthorDate: Wed Feb 7 11:54:21 2024 +0800

[SPARK-46526][SQL] Support LIMIT over correlated subqueries where 
predicates only reference outer table

### What changes were proposed in this pull request?

The type of query that this PR addresses is the following:

```
SELECT COUNT(DISTINCT(t1a))
FROM t1
WHERE t1d IN (SELECT t2d
  FROM   t2
  WHERE t1a IS NOT NULL
  LIMIT 10);
```

Here, the predicate in the subquery `t1a IS NOT NULL` does not reference 
the inner table at all, so our standard decorrelation technique of "compute 10 
values of t2d per every value of the inner table" does not work. In fact, such 
predicates can be lifted above the limit 10. This PR achieves exactly that.

### Why are the changes needed?

Fixed the bug.

### Does this PR introduce _any_ user-facing change?

Some broken queries are now working.

### How was this patch tested?

Query tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44514 from agubichev/SPARK-46526_limit_corr.

Authored-by: Andrey Gubichev 
Signed-off-by: Wenchen Fan 
---
 .../catalyst/optimizer/DecorrelateInnerQuery.scala | 34 +
 .../exists-subquery/exists-orderby-limit.sql.out   | 57 ++
 .../subquery/in-subquery/in-limit.sql.out  | 85 +
 .../scalar-subquery-predicate.sql.out  | 89 ++
 .../exists-subquery/exists-orderby-limit.sql   | 18 +
 .../inputs/subquery/in-subquery/in-limit.sql   | 21 +
 .../scalar-subquery/scalar-subquery-predicate.sql  | 22 ++
 .../exists-subquery/exists-orderby-limit.sql.out   | 39 ++
 .../results/subquery/in-subquery/in-limit.sql.out  | 39 ++
 .../scalar-subquery-predicate.sql.out  | 40 ++
 10 files changed, 431 insertions(+), 13 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index eca392fd84ca..1ebf0c7b39a4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -673,20 +673,28 @@ object DecorrelateInnerQuery extends PredicateHelper {
   decorrelate(child, parentOuterReferences, aggregated = true, 
underSetOp)
 val collectedChildOuterReferences = 
collectOuterReferencesInPlanTree(child)
 // Add outer references to the PARTITION BY clause
-val partitionFields = 
collectedChildOuterReferences.map(outerReferenceMap(_)).toSeq
-val orderByFields = replaceOuterReferences(ordering, 
outerReferenceMap)
+val partitionFields = collectedChildOuterReferences
+  .filter(outerReferenceMap.contains(_))
+  .map(outerReferenceMap(_)).toSeq
+if (partitionFields.isEmpty) {
+  // Underlying subquery has no predicates connecting inner and 
outer query.
+  // In this case, limit can be computed over the inner query 
directly.
+  (Limit(limit, newChild), joinCond, outerReferenceMap)
+} else {
+  val orderByFields = replaceOuterReferences(ordering, 
outerReferenceMap)
 
-val rowNumber = WindowExpression(RowNumber(),
-  WindowSpecDefinition(partitionFields, orderByFields,
-SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
CurrentRow)))
-val rowNumberAlias = Alias(rowNumber, "rn")()
-// Window function computes row_number() when partitioning by 
correlated references,
-// and projects all the other fields from the input.
-val window = Window(Seq(rowNumberAlias),
-  partitionFields, orderByFields, newChild)
-val filter = Filter(LessThanOrEqual(rowNumberAlias.toAttribute, 
limit), window)
-val project = Project(newChild.output, filter)
-(project, joinCond, outerReferenceMap)
+  val rowNumber = WindowExpression(RowNumber(),
+WindowSpecDefinition(partitionFields, orderByFields,
+  SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
CurrentRow))

(spark) branch master updated: [SPARK-46980][SQL][MINOR] Avoid using internal APIs in dataframe end-to-end tests

2024-02-05 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1a577b8f8881 [SPARK-46980][SQL][MINOR] Avoid using internal APIs in 
dataframe end-to-end tests
1a577b8f8881 is described below

commit 1a577b8f88816eabd78b3924f1e159f2593209c5
Author: Mark Jarvin 
AuthorDate: Tue Feb 6 09:11:19 2024 +0800

[SPARK-46980][SQL][MINOR] Avoid using internal APIs in dataframe end-to-end 
tests

### What changes were proposed in this pull request?

Avoid using the internal `XORShiftRandom` API in tests by instead using 
public APIs to collect the random values.

### Why are the changes needed?

Testing using internal APIs introduces unnecessary coupling between the 
implementation and the test.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Ran tests using SBT

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #45034 from markj-db/test-public-api-rand.

Authored-by: Mark Jarvin 
Signed-off-by: Wenchen Fan 
---
 .../scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala   | 7 ++-
 sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala  | 4 +---
 2 files changed, 3 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
index d6cf77572731..cbc39557ce4c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
@@ -305,11 +305,8 @@ class DataFrameSetOperationsSuite extends QueryTest
 // When generating expected results at here, we need to follow the 
implementation of
 // Rand expression.
 def expected(df: DataFrame): Seq[Row] =
-  df.rdd.collectPartitions().zipWithIndex.flatMap {
-case (data, index) =>
-  val rng = new org.apache.spark.util.random.XORShiftRandom(7 + index)
-  data.filter(_.getInt(0) < rng.nextDouble() * 10)
-  }.toSeq
+  df.select($"i", rand(7) * 10).as[(Long, Double)].collect()
+.filter(r => r._1 < r._2).map(r => Row(r._1)).toImmutableArraySeq
 
 val union = df1.union(df2)
 checkAnswer(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 1442320e9a94..d85c7b7dfa3d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -57,7 +57,6 @@ import org.apache.spark.tags.SlowSQLTest
 import org.apache.spark.unsafe.types.CalendarInterval
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.Utils
-import org.apache.spark.util.random.XORShiftRandom
 
 @SlowSQLTest
 class DataFrameSuite extends QueryTest
@@ -1922,8 +1921,7 @@ class DataFrameSuite extends QueryTest
   test("SPARK-9083: sort with non-deterministic expressions") {
 val seed = 33
 val df = (1 to 100).map(Tuple1.apply).toDF("i").repartition(1)
-val random = new XORShiftRandom(seed)
-val expected = (1 to 100).map(_ -> 
random.nextDouble()).sortBy(_._2).map(_._1)
+val expected = df.select($"i", rand(seed)).as[(Long, 
Double)].collect().sortBy(_._2).map(_._1)
 val actual = df.sort(rand(seed)).collect().map(_.getInt(0))
 assert(expected === actual)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[jira] [Resolved] (SPARK-46980) Avoid using internal APIs in dataframe end-to-end tests

2024-02-05 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46980.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45034
[https://github.com/apache/spark/pull/45034]

> Avoid using internal APIs in dataframe end-to-end tests
> ---
>
> Key: SPARK-46980
> URL: https://issues.apache.org/jira/browse/SPARK-46980
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Wenchen Fan
>Assignee: Mark Jarvin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46980) Avoid using internal APIs in tests

2024-02-05 Thread Wenchen Fan (Jira)
Wenchen Fan created SPARK-46980:
---

 Summary: Avoid using internal APIs in tests
 Key: SPARK-46980
 URL: https://issues.apache.org/jira/browse/SPARK-46980
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 4.0.0
Reporter: Wenchen Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46980) Avoid using internal APIs in dataframe end-to-end tests

2024-02-05 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan updated SPARK-46980:

Summary: Avoid using internal APIs in dataframe end-to-end tests  (was: 
Avoid using internal APIs in tests)

> Avoid using internal APIs in dataframe end-to-end tests
> ---
>
> Key: SPARK-46980
> URL: https://issues.apache.org/jira/browse/SPARK-46980
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Wenchen Fan
>Assignee: Mark Jarvin
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46833) Using ICU library for collation tracking

2024-02-05 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46833:
---

Assignee: Aleksandar Tomic

> Using ICU library for collation tracking
> 
>
> Key: SPARK-46833
> URL: https://issues.apache.org/jira/browse/SPARK-46833
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Aleksandar Tomic
>Assignee: Aleksandar Tomic
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-46833][SQL] Collations - Introducing CollationFactory which provides comparison and hashing rules for supported collations

2024-02-05 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6ff5f396632d [SPARK-46833][SQL] Collations - Introducing 
CollationFactory which provides comparison and hashing rules for supported 
collations
6ff5f396632d is described below

commit 6ff5f396632d5b715df083ade81349206373c78c
Author: Aleksandar Tomic 
AuthorDate: Tue Feb 6 00:22:08 2024 +0800

[SPARK-46833][SQL] Collations - Introducing CollationFactory which provides 
comparison and hashing rules for supported collations

### What changes were proposed in this pull request?

This PR introduces CollationFactory singleton class which provides all 
collation aware methods, for given collation, that can be invoked against 
UTF8Strings.

For higher level overview of Collation track please take a look at the 
umbrella [JIRA](https://issues.apache.org/jira/browse/SPARK-46830).

At this point, CollationFactory is still not exposed, besides tests. The 
connection between UTF8String and CollationFactory is coming in the next PR.

### Why are the changes needed?

Please refer to umbrella JIRA ticket for collation effort.

### Does this PR introduce _any_ user-facing change?

At this point No, this is just prep for user facing changes.

### How was this patch tested?

Unit tests for CollationFactory come with this PR. Tests are basic sanity 
tests, proper testing will come as we get E2E features.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44968 from dbatomic/utf8_collation_extension.

Lead-authored-by: Aleksandar Tomic 
Co-authored-by: Aleksandar Tomic 
<150942779+dbato...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
---
 common/unsafe/pom.xml  |   6 +
 .../spark/sql/catalyst/util/CollationFactory.java  | 175 +
 .../spark/unsafe/types/CollationFactorySuite.scala | 116 ++
 .../src/main/resources/error/error-classes.json|   6 +
 .../scala/org/apache/spark/SparkException.scala|  10 ++
 dev/deps/spark-deps-hadoop-3-hive-2.3  |   1 +
 docs/sql-error-conditions.md   |   6 +
 pom.xml|   6 +
 sql/core/pom.xml   |   4 +
 9 files changed, 330 insertions(+)

diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml
index 4d23c9149bb7..e9785ebb7ad4 100644
--- a/common/unsafe/pom.xml
+++ b/common/unsafe/pom.xml
@@ -47,6 +47,12 @@
   ${project.version}
 
 
+
+  com.ibm.icu
+  icu4j
+  ${icu4j.version}
+
+
 

[jira] [Resolved] (SPARK-46833) Using ICU library for collation tracking

2024-02-05 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46833.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44968
[https://github.com/apache/spark/pull/44968]

> Using ICU library for collation tracking
> 
>
> Key: SPARK-46833
> URL: https://issues.apache.org/jira/browse/SPARK-46833
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Aleksandar Tomic
>Assignee: Aleksandar Tomic
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46946) Supporting broadcast of multiple filtering keys in DynamicPruning

2024-02-02 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46946.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44988
[https://github.com/apache/spark/pull/44988]

> Supporting broadcast of multiple filtering keys in DynamicPruning
> -
>
> Key: SPARK-46946
> URL: https://issues.apache.org/jira/browse/SPARK-46946
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0, 3.5.1
>Reporter: Thang Long Vu
>Assignee: Thang Long Vu
>Priority: Major
>  Labels: pull-request-available, releasenotes
> Fix For: 4.0.0
>
>
> This PR extends `DynamicPruningSubquery` to support broadcasting of multiple 
> filtering keys (instead of one as before). The majority of the PR is to 
> simply generalise singularity to plurality.
> Note: We actually do not use the multiple filtering keys 
> `DynamicPruningSubquery` in this PR, we are doing this to make supporting DPP 
> Null Safe Equality or multiple Equality predicates easier in the future.
> In Null Safe Equality JOIN, the JOIN condition `a <=> b` is transformed to 
> `Coalesce(key1, Literal(key1.dataType)) = Coalesce(key2, 
> Literal(key2.dataType)) AND IsNull(key1) = IsNull(key2)`. In order to have 
> the highest pruning efficiency, we broadcast the 2 keys `Coalesce(key, 
> Literal(key.dataType))` and `IsNull(key)` and use them to prune the other 
> side at the same time. 
> Before, the `DynamicPruningSubquery` only has one broadcasting key and we 
> only supports DPP for one `EqualTo` JOIN predicate, now we are extending the 
> subquery to multiple broadcasting keys. Please note that DPP has not been 
> supported for multiple JOIN predicates. 
> Put it in another way, at the moment, we don't insert a DPP Filter for 
> multiple JOIN predicates at the same time, only potentially insert a DPP 
> Filter for a given Equality JOIN predicate.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46946) Supporting broadcast of multiple filtering keys in DynamicPruning

2024-02-02 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46946:
---

Assignee: Thang Long Vu

> Supporting broadcast of multiple filtering keys in DynamicPruning
> -
>
> Key: SPARK-46946
> URL: https://issues.apache.org/jira/browse/SPARK-46946
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0, 3.5.1
>Reporter: Thang Long Vu
>Assignee: Thang Long Vu
>Priority: Major
>  Labels: pull-request-available, releasenotes
>
> This PR extends `DynamicPruningSubquery` to support broadcasting of multiple 
> filtering keys (instead of one as before). The majority of the PR is to 
> simply generalise singularity to plurality.
> Note: We actually do not use the multiple filtering keys 
> `DynamicPruningSubquery` in this PR, we are doing this to make supporting DPP 
> Null Safe Equality or multiple Equality predicates easier in the future.
> In Null Safe Equality JOIN, the JOIN condition `a <=> b` is transformed to 
> `Coalesce(key1, Literal(key1.dataType)) = Coalesce(key2, 
> Literal(key2.dataType)) AND IsNull(key1) = IsNull(key2)`. In order to have 
> the highest pruning efficiency, we broadcast the 2 keys `Coalesce(key, 
> Literal(key.dataType))` and `IsNull(key)` and use them to prune the other 
> side at the same time. 
> Before, the `DynamicPruningSubquery` only has one broadcasting key and we 
> only supports DPP for one `EqualTo` JOIN predicate, now we are extending the 
> subquery to multiple broadcasting keys. Please note that DPP has not been 
> supported for multiple JOIN predicates. 
> Put it in another way, at the moment, we don't insert a DPP Filter for 
> multiple JOIN predicates at the same time, only potentially insert a DPP 
> Filter for a given Equality JOIN predicate.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-46946][SQL] Supporting broadcast of multiple filtering keys in DynamicPruning

2024-02-02 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 22e5938aefc7 [SPARK-46946][SQL] Supporting broadcast of multiple 
filtering keys in DynamicPruning
22e5938aefc7 is described below

commit 22e5938aefc784f50218a86e013e4c2247271072
Author: Thang Long VU 
AuthorDate: Fri Feb 2 19:55:34 2024 +0800

[SPARK-46946][SQL] Supporting broadcast of multiple filtering keys in 
DynamicPruning

### What changes were proposed in this pull request?

This PR extends `DynamicPruningSubquery` to support broadcasting of 
multiple filtering keys (instead of one as before). The majority of the PR is 
to simply generalise singularity to plurality.

**Note:** We actually do not use the multiple filtering keys 
`DynamicPruningSubquery` in this PR, we are doing this to make supporting DPP 
Null Safe Equality or multiple Equality predicates easier in the future.

In Null Safe Equality JOIN, the JOIN condition `a <=> b` is transformed to 
`Coalesce(key1, Literal(key1.dataType)) = Coalesce(key2, 
Literal(key2.dataType)) AND IsNull(key1) = IsNull(key2)`. In order to have the 
highest pruning efficiency, we broadcast the 2 keys `Coalesce(key, 
Literal(key.dataType))` and `IsNull(key)` and use them to prune the other side 
at the same time.

Before, the `DynamicPruningSubquery` only has one broadcasting key and we 
only supports DPP for one `EqualTo` JOIN predicate, now we are extending the 
subquery to multiple broadcasting keys. Please note that DPP has not been 
supported for multiple JOIN predicates.

Put it in another way, at the moment, we don't insert a DPP Filter for 
multiple JOIN predicates at the same time, only potentially insert a DPP Filter 
for a given Equality JOIN predicate.

### Why are the changes needed?

To make supporting DPP Null Safe Equality or DPP multiple Equality 
predicates easier in the future.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44988 from longvu-db/multiple-broadcast-filtering-keys.

Authored-by: Thang Long VU 
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/expressions/DynamicPruning.scala  | 12 +--
 .../expressions/DynamicPruningSubquerySuite.scala  | 89 ++
 .../execution/SubqueryAdaptiveBroadcastExec.scala  |  2 +-
 .../sql/execution/SubqueryBroadcastExec.scala  | 37 -
 .../PlanAdaptiveDynamicPruningFilters.scala|  8 +-
 .../adaptive/PlanAdaptiveSubqueries.scala  |  4 +-
 .../dynamicpruning/PartitionPruning.scala  | 15 ++--
 .../dynamicpruning/PlanDynamicPruningFilters.scala |  9 ++-
 .../spark/sql/DynamicPartitionPruningSuite.scala   |  2 +-
 9 files changed, 138 insertions(+), 40 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala
index ec6925eaa984..cc24a982d5d8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala
@@ -37,13 +37,13 @@ trait DynamicPruning extends Predicate
  *  beneficial and so it should be executed even if it cannot reuse the 
results of the
  *  broadcast through ReuseExchange; otherwise, it will use the filter only if 
it
  *  can reuse the results of the broadcast through ReuseExchange
- * @param broadcastKeyIndex the index of the filtering key collected from the 
broadcast
+ * @param broadcastKeyIndices the indices of the filtering keys collected from 
the broadcast
  */
 case class DynamicPruningSubquery(
 pruningKey: Expression,
 buildQuery: LogicalPlan,
 buildKeys: Seq[Expression],
-broadcastKeyIndex: Int,
+broadcastKeyIndices: Seq[Int],
 onlyInBroadcast: Boolean,
 exprId: ExprId = NamedExpression.newExprId,
 hint: Option[HintInfo] = None)
@@ -67,10 +67,12 @@ case class DynamicPruningSubquery(
   buildQuery.resolved &&
   buildKeys.nonEmpty &&
   buildKeys.forall(_.resolved) &&
-  broadcastKeyIndex >= 0 &&
-  broadcastKeyIndex < buildKeys.size &&
+  broadcastKeyIndices.forall(idx => idx >= 0 && idx < buildKeys.size) &&
   buildKeys.forall(_.references.subsetOf(buildQuery.outputSet)) &&
-  pruningKey.dataType == buildKeys(broadcastKeyIndex).dataType
+  // DynamicPruningSubquery should only have a single broadcasting key 
since
+  // there are no usa

[jira] [Updated] (SPARK-46922) Do not wrap runtime user-facing errors

2024-02-01 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan updated SPARK-46922:

Summary: Do not wrap runtime user-facing errors  (was: better handling for 
runtime user errors)

> Do not wrap runtime user-facing errors
> --
>
> Key: SPARK-46922
> URL: https://issues.apache.org/jira/browse/SPARK-46922
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Wenchen Fan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46951) Define retry-able errors

2024-02-01 Thread Wenchen Fan (Jira)
Wenchen Fan created SPARK-46951:
---

 Summary: Define retry-able errors
 Key: SPARK-46951
 URL: https://issues.apache.org/jira/browse/SPARK-46951
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 4.0.0
Reporter: Wenchen Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-45807][SQL] Return View after calling replaceView(..)

2024-02-01 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cedef63faf14 [SPARK-45807][SQL] Return View after calling 
replaceView(..)
cedef63faf14 is described below

commit cedef63faf14ce41ea9f4540faa4a1c18cf7cea8
Author: Eduard Tudenhoefner 
AuthorDate: Fri Feb 2 10:35:56 2024 +0800

[SPARK-45807][SQL] Return View after calling replaceView(..)

### What changes were proposed in this pull request?

Follow-up API improvements based on from 
https://github.com/apache/spark/pull/43677

### Why are the changes needed?

Required for DataSourceV2 view support.

### Does this PR introduce _any_ user-facing change?

No
### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

N/A

Closes #44970 from nastra/SPARK-45807-return-type.

Authored-by: Eduard Tudenhoefner 
Signed-off-by: Wenchen Fan 
---
 .../java/org/apache/spark/sql/connector/catalog/ViewCatalog.java  | 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java
index 933289cab40b..abe5fb3148d0 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java
@@ -115,7 +115,7 @@ public interface ViewCatalog extends CatalogPlugin {
* Create a view in the catalog.
*
* @param viewInfo the info class holding all view information
-   * @return the view created
+   * @return the created view. This can be null if getting the metadata for 
the view is expensive
* @throws ViewAlreadyExistsException If a view or table already exists for 
the identifier
* @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
*/
@@ -129,10 +129,12 @@ public interface ViewCatalog extends CatalogPlugin {
*
* @param viewInfo the info class holding all view information
* @param orCreate create the view if it doesn't exist
+   * @return the created/replaced view. This can be null if getting the 
metadata
+   * for the view is expensive
* @throws NoSuchViewException If the view doesn't exist or is a table
* @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
*/
-  default void replaceView(
+  default View replaceView(
   ViewInfo viewInfo,
   boolean orCreate)
   throws NoSuchViewException, NoSuchNamespaceException {
@@ -143,7 +145,7 @@ public interface ViewCatalog extends CatalogPlugin {
 }
 
 try {
-  createView(viewInfo);
+  return createView(viewInfo);
 } catch (ViewAlreadyExistsException e) {
   throw new RuntimeException("Race condition when creating/replacing 
view", e);
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[jira] [Assigned] (SPARK-46908) Extend SELECT * support outside of select list

2024-02-01 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46908:
---

Assignee: Serge Rielau

> Extend SELECT * support outside of select list
> --
>
> Key: SPARK-46908
> URL: https://issues.apache.org/jira/browse/SPARK-46908
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Serge Rielau
>Assignee: Serge Rielau
>Priority: Major
>  Labels: SQL, pull-request-available
>
> Traditionally * is confined to thr select list and there to the top level of 
> expressions.
> Spark does, in an undocumented fashion support * in the SELECT list for 
> function argument list.
> Here we want to expand upon this capability by adding the WHERE clause 
> (Filter) as well as a couple of more scenarios such as row value constructors 
> and IN operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46908) Extend SELECT * support outside of select list

2024-02-01 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46908.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44938
[https://github.com/apache/spark/pull/44938]

> Extend SELECT * support outside of select list
> --
>
> Key: SPARK-46908
> URL: https://issues.apache.org/jira/browse/SPARK-46908
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Serge Rielau
>Assignee: Serge Rielau
>Priority: Major
>  Labels: SQL, pull-request-available
> Fix For: 4.0.0
>
>
> Traditionally * is confined to thr select list and there to the top level of 
> expressions.
> Spark does, in an undocumented fashion support * in the SELECT list for 
> function argument list.
> Here we want to expand upon this capability by adding the WHERE clause 
> (Filter) as well as a couple of more scenarios such as row value constructors 
> and IN operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated (53cbaeb20293 -> 49e3f5e3bad4)

2024-02-01 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 53cbaeb20293 [SPARK-46936][PS] Implement `Frame.to_feather`
 add 49e3f5e3bad4 [SPARK-46908][SQL] Support star clause in WHERE clause

No new revisions were added by this update.

Summary of changes:
 docs/sql-migration-guide.md|   1 +
 docs/sql-ref-function-invocation.md| 112 +++
 docs/sql-ref-syntax-qry-select.md  |   7 +-
 docs/sql-ref-syntax-qry-star.md|  97 +
 docs/sql-ref-syntax.md |   1 +
 docs/sql-ref.md|   1 +
 .../spark/sql/catalyst/analysis/Analyzer.scala |  10 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala |   7 +-
 .../org/apache/spark/sql/internal/SQLConf.scala|  13 ++
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala |   4 +-
 .../analyzer-results/selectExcept.sql.out  | 218 +
 .../subquery/in-subquery/in-group-by.sql.out   |   2 +-
 .../resources/sql-tests/inputs/selectExcept.sql|  29 +++
 .../inputs/subquery/in-subquery/in-group-by.sql|   2 +-
 .../sql-tests/results/selectExcept.sql.out | 168 
 .../subquery/in-subquery/in-group-by.sql.out   |   2 +-
 16 files changed, 666 insertions(+), 8 deletions(-)
 create mode 100644 docs/sql-ref-function-invocation.md
 create mode 100644 docs/sql-ref-syntax-qry-star.md


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[jira] [Resolved] (SPARK-46933) Add execution time metric for jdbc query

2024-02-01 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46933.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44969
[https://github.com/apache/spark/pull/44969]

> Add execution time metric for jdbc query
> 
>
> Key: SPARK-46933
> URL: https://issues.apache.org/jira/browse/SPARK-46933
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core
>Affects Versions: 3.5.1
>Reporter: Milan Stefanovic
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> expose additional metrics when jdbcrdd is used



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-46933][SQL] Add query execution time metric to connectors which use JDBCRDD

2024-02-01 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 22586efb3cd4 [SPARK-46933][SQL] Add query execution time metric to 
connectors which use JDBCRDD
22586efb3cd4 is described below

commit 22586efb3cd4a19969d92b249a14326fba3244d2
Author: Uros Stankovic 
AuthorDate: Thu Feb 1 22:48:50 2024 +0800

[SPARK-46933][SQL] Add query execution time metric to connectors which use 
JDBCRDD

### What changes were proposed in this pull request?
This pull request should add measuring query execution time on external 
JDBC data source.
Another change is changing access right for JDBCRDD class, that is needed 
for adding another metric (SQL text) which will be done in some next PR.

### Why are the changes needed?
Query execution time is very important metric to have

### Does this PR introduce _any_ user-facing change?
User can see query execution time on SparkPlan graph under node metrics tab

### How was this patch tested?
Tested using custom image

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44969 from 
urosstan-db/SPARK-46933-Add-scan-metrics-to-jdbc-connector.

Lead-authored-by: Uros Stankovic 
Co-authored-by: Uros Stankovic 
<155642965+urosstan...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 17 --
 .../datasources/DataSourceMetricsMixin.scala   | 24 
 .../sql/execution/datasources/jdbc/JDBCRDD.scala   | 26 --
 3 files changed, 63 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 2622eadaefb3..ec265f4eaea4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -99,6 +99,10 @@ trait DataSourceScanExec extends LeafExecNode {
   def inputRDDs(): Seq[RDD[InternalRow]]
 }
 
+object DataSourceScanExec {
+  val numOutputRowsKey = "numOutputRows"
+}
+
 /** Physical plan node for scanning data from a relation. */
 case class RowDataSourceScanExec(
 output: Seq[Attribute],
@@ -111,8 +115,17 @@ case class RowDataSourceScanExec(
 tableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec with InputRDDCodegen {
 
-  override lazy val metrics =
-Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
+  override lazy val metrics: Map[String, SQLMetric] = {
+val metrics = Map(
+  DataSourceScanExec.numOutputRowsKey ->
+SQLMetrics.createMetric(sparkContext, "number of output rows")
+)
+
+rdd match {
+  case rddWithDSMetrics: DataSourceMetricsMixin => metrics ++ 
rddWithDSMetrics.getMetrics
+  case _ => metrics
+}
+  }
 
   protected override def doExecute(): RDD[InternalRow] = {
 val numOutputRows = longMetric("numOutputRows")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceMetricsMixin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceMetricsMixin.scala
new file mode 100644
index ..6c1e5e876e99
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceMetricsMixin.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+trait DataSourceMetricsMixin {
+  def getMetrics: Seq[(String, SQLMetric)]
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 934ed9a

[jira] [Created] (SPARK-46922) better handling for runtime user errors

2024-01-30 Thread Wenchen Fan (Jira)
Wenchen Fan created SPARK-46922:
---

 Summary: better handling for runtime user errors
 Key: SPARK-46922
 URL: https://issues.apache.org/jira/browse/SPARK-46922
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 4.0.0
Reporter: Wenchen Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46905) Add dedicated class to keep column definition instead of StructField in Create/ReplaceTable command

2024-01-29 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46905.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44935
[https://github.com/apache/spark/pull/44935]

> Add dedicated class to keep column definition instead of StructField in 
> Create/ReplaceTable command
> ---
>
> Key: SPARK-46905
> URL: https://issues.apache.org/jira/browse/SPARK-46905
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46905) Add dedicated class to keep column definition instead of StructField in Create/ReplaceTable command

2024-01-29 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46905:
---

Assignee: Wenchen Fan

> Add dedicated class to keep column definition instead of StructField in 
> Create/ReplaceTable command
> ---
>
> Key: SPARK-46905
> URL: https://issues.apache.org/jira/browse/SPARK-46905
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-46905][SQL] Add dedicated class to keep column definition instead of StructField in Create/ReplaceTable command

2024-01-29 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4da7c3d316e3 [SPARK-46905][SQL] Add dedicated class to keep column 
definition instead of StructField in Create/ReplaceTable command
4da7c3d316e3 is described below

commit 4da7c3d316e3d1340258698e841be370bd16d6fa
Author: Wenchen Fan 
AuthorDate: Tue Jan 30 15:09:15 2024 +0800

[SPARK-46905][SQL] Add dedicated class to keep column definition instead of 
StructField in Create/ReplaceTable command

### What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/44876 to 
refactor the code and make it cleaner. The idea is to add a dedicated class for 
column definition instead of `StructField` in Create/ReplaceTable command. This 
is more flexible and cleaner than adding an additional default column 
expression in `CreateTable` command.

### Why are the changes needed?

Code refactor, and makes it easier to fully eliminate the need of a fake 
analyzer for default value handling in the future. We should make similar code 
changes to ALTER TABLE command and v1 CREATE TABLE command.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #44935 from cloud-fan/refactor.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../src/main/resources/error/error-classes.json|   5 +
 ...conditions-invalid-default-value-error-class.md |   4 +
 .../sql/catalyst/analysis/CheckAnalysis.scala  |   3 +
 .../spark/sql/catalyst/parser/AstBuilder.scala | 106 +
 .../catalyst/plans/logical/ColumnDefinition.scala  | 164 +
 .../sql/catalyst/plans/logical/v2Commands.scala|  55 +++
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  |  30 +++-
 .../sql/connector/catalog/CatalogV2Util.scala  |  26 +---
 .../spark/sql/errors/QueryCompilationErrors.scala  |  13 ++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 148 ++-
 .../catalyst/analysis/ReplaceCharWithVarchar.scala |  12 +-
 .../catalyst/analysis/ResolveSessionCatalog.scala  |   5 +-
 .../datasources/v2/DataSourceV2Strategy.scala  |  37 +++--
 .../apache/spark/sql/internal/CatalogImpl.scala|   4 +-
 .../spark/sql/streaming/DataStreamWriter.scala |   9 +-
 .../org/apache/spark/sql/sources/InsertSuite.scala |   2 +-
 16 files changed, 393 insertions(+), 230 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 64d65fd4beed..8e47490f5a61 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1766,6 +1766,11 @@
   "which requires  type, but the statement provided a 
value of incompatible  type."
 ]
   },
+  "NOT_CONSTANT" : {
+"message" : [
+  "which is not a constant expression whose equivalent value is known 
at query planning time."
+]
+  },
   "SUBQUERY_EXPRESSION" : {
 "message" : [
   "which contains subquery expressions."
diff --git a/docs/sql-error-conditions-invalid-default-value-error-class.md 
b/docs/sql-error-conditions-invalid-default-value-error-class.md
index c73d9d5ccbbb..72a5b0db8da0 100644
--- a/docs/sql-error-conditions-invalid-default-value-error-class.md
+++ b/docs/sql-error-conditions-invalid-default-value-error-class.md
@@ -34,6 +34,10 @@ This error class has the following derived error classes:
 
 which requires `` type, but the statement provided a value of 
incompatible `` type.
 
+## NOT_CONSTANT
+
+which is not a constant expression whose equivalent value is known at query 
planning time.
+
 ## SUBQUERY_EXPRESSION
 
 which contains subquery expressions.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 1b69e933815b..3b1663b4c54c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -302,6 +302,9 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
 // general unresolved check below to throw a more tailored error 
message.
 new 
ResolveReferencesInAggregate(catalogManager).checkUnresolvedGroupByAll(operator)
 
+// 

[jira] [Created] (SPARK-46905) Add dedicated class to keep column definition instead of StructField in Create/ReplaceTable command

2024-01-29 Thread Wenchen Fan (Jira)
Wenchen Fan created SPARK-46905:
---

 Summary: Add dedicated class to keep column definition instead of 
StructField in Create/ReplaceTable command
 Key: SPARK-46905
 URL: https://issues.apache.org/jira/browse/SPARK-46905
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 4.0.0
Reporter: Wenchen Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-46590][SQL][FOLLOWUP] Update CoalesceShufflePartitions comments

2024-01-23 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 0956db6901bf [SPARK-46590][SQL][FOLLOWUP] Update 
CoalesceShufflePartitions comments
0956db6901bf is described below

commit 0956db6901bf03d2d948b23f00bcd6e74a0c251b
Author: zml1206 
AuthorDate: Wed Jan 24 15:06:55 2024 +0800

[SPARK-46590][SQL][FOLLOWUP] Update CoalesceShufflePartitions comments

### What changes were proposed in this pull request?
After #44661 ,In addition to Union, children of CartesianProduct, 
BroadcastHashJoin and BroadcastNestedLoopJoin can also be coalesced 
independently, update comments.

### Why are the changes needed?
Improve the readability and maintainability.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
N/A

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44854 from zml1206/SPARK-46590-FOLLOWUP.

Authored-by: zml1206 
Signed-off-by: Wenchen Fan 
(cherry picked from commit fe4f8eac3efee42d53f7f24763a59c82ef03d343)
Signed-off-by: Wenchen Fan 
---
 .../sql/execution/adaptive/CoalesceShufflePartitions.scala| 11 ++-
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 26e5ac649dbb..db4a6b7dcf2e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -66,9 +66,9 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends AQEShuffleRe
   }
 }
 
-// Sub-plans under the Union operator can be coalesced independently, so 
we can divide them
-// into independent "coalesce groups", and all shuffle stages within each 
group have to be
-// coalesced together.
+// Sub-plans under the 
Union/CartesianProduct/BroadcastHashJoin/BroadcastNestedLoopJoin
+// operator can be coalesced independently, so we can divide them into 
independent
+// "coalesce groups", and all shuffle stages within each group have to be 
coalesced together.
 val coalesceGroups = collectCoalesceGroups(plan)
 
 // Divide minimum task parallelism among coalesce groups according to 
their data sizes.
@@ -137,8 +137,9 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends AQEShuffleRe
   }
 
   /**
-   * Gather all coalesce-able groups such that the shuffle stages in each 
child of a Union operator
-   * are in their independent groups if:
+   * Gather all coalesce-able groups such that the shuffle stages in each 
child of a
+   * Union/CartesianProduct/BroadcastHashJoin/BroadcastNestedLoopJoin operator 
are in their
+   * independent groups if:
* 1) all leaf nodes of this child are exchange stages; and
* 2) all these shuffle stages support coalescing.
*/


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46590][SQL][FOLLOWUP] Update CoalesceShufflePartitions comments

2024-01-23 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fe4f8eac3efe [SPARK-46590][SQL][FOLLOWUP] Update 
CoalesceShufflePartitions comments
fe4f8eac3efe is described below

commit fe4f8eac3efee42d53f7f24763a59c82ef03d343
Author: zml1206 
AuthorDate: Wed Jan 24 15:06:55 2024 +0800

[SPARK-46590][SQL][FOLLOWUP] Update CoalesceShufflePartitions comments

### What changes were proposed in this pull request?
After #44661 ,In addition to Union, children of CartesianProduct, 
BroadcastHashJoin and BroadcastNestedLoopJoin can also be coalesced 
independently, update comments.

### Why are the changes needed?
Improve the readability and maintainability.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
N/A

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44854 from zml1206/SPARK-46590-FOLLOWUP.

Authored-by: zml1206 
Signed-off-by: Wenchen Fan 
---
 .../sql/execution/adaptive/CoalesceShufflePartitions.scala| 11 ++-
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 26e5ac649dbb..db4a6b7dcf2e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -66,9 +66,9 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends AQEShuffleRe
   }
 }
 
-// Sub-plans under the Union operator can be coalesced independently, so 
we can divide them
-// into independent "coalesce groups", and all shuffle stages within each 
group have to be
-// coalesced together.
+// Sub-plans under the 
Union/CartesianProduct/BroadcastHashJoin/BroadcastNestedLoopJoin
+// operator can be coalesced independently, so we can divide them into 
independent
+// "coalesce groups", and all shuffle stages within each group have to be 
coalesced together.
 val coalesceGroups = collectCoalesceGroups(plan)
 
 // Divide minimum task parallelism among coalesce groups according to 
their data sizes.
@@ -137,8 +137,9 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends AQEShuffleRe
   }
 
   /**
-   * Gather all coalesce-able groups such that the shuffle stages in each 
child of a Union operator
-   * are in their independent groups if:
+   * Gather all coalesce-able groups such that the shuffle stages in each 
child of a
+   * Union/CartesianProduct/BroadcastHashJoin/BroadcastNestedLoopJoin operator 
are in their
+   * independent groups if:
* 1) all leaf nodes of this child are exchange stages; and
* 2) all these shuffle stages support coalescing.
*/


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[jira] [Resolved] (SPARK-46683) Write a subquery generator that generates subqueries of different variations to increase testing coverage in this area

2024-01-23 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46683.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44599
[https://github.com/apache/spark/pull/44599]

> Write a subquery generator that generates subqueries of different variations 
> to increase testing coverage in this area
> --
>
> Key: SPARK-46683
> URL: https://issues.apache.org/jira/browse/SPARK-46683
> Project: Spark
>  Issue Type: Test
>  Components: Optimizer, SQL
>Affects Versions: 3.5.1
>Reporter: Andy Lam
>Assignee: Andy Lam
>Priority: Major
>  Labels: correctness, pull-request-available, testing
> Fix For: 4.0.0
>
>
> There are a lot of subquery correctness issues, ranging from very old bugs to 
> new ones that are being introduced due to work being done on subquery 
> correlation optimization. This is especially in the areas of COUNT bugs and 
> null behaviors.
> To increase test coverage and robustness in this area, we want to write a 
> subquery generator that writes variations of subqueries, producing SQL tests 
> that also run against Postgres (from my work in SPARK-46179).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated (2aed25b15dba -> bc889c8c2adb)

2024-01-23 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 2aed25b15dba Revert "[SPARK-46417][SQL] Do not fail when calling 
hive.getTable and throwException is false"
 add bc889c8c2adb [SPARK-46683] Write a subquery generator that generates 
subqueries permutations to increase testing coverage

No new revisions were added by this update.

Summary of changes:
 .../jdbc/querytest/GeneratedSubquerySuite.scala| 464 +
 .../apache/spark/sql/QueryGeneratorHelper.scala| 222 ++
 2 files changed, 686 insertions(+)
 create mode 100644 
connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/querytest/GeneratedSubquerySuite.scala
 create mode 100644 
sql/core/src/test/scala/org/apache/spark/sql/QueryGeneratorHelper.scala


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[jira] [Resolved] (SPARK-46763) ReplaceDeduplicateWithAggregate fails when non-grouping keys have duplicate attributes

2024-01-23 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46763.
-
Fix Version/s: 3.4.3
   3.5.1
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 44835
[https://github.com/apache/spark/pull/44835]

> ReplaceDeduplicateWithAggregate fails when non-grouping keys have duplicate 
> attributes
> --
>
> Key: SPARK-46763
> URL: https://issues.apache.org/jira/browse/SPARK-46763
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 3.5.0
>Reporter: Nikhil Sheoran
>Assignee: Nikhil Sheoran
>Priority: Major
>  Labels: pull-request-available
> Fix For: 3.4.3, 3.5.1, 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46763) ReplaceDeduplicateWithAggregate fails when non-grouping keys have duplicate attributes

2024-01-23 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46763:
---

Assignee: Nikhil Sheoran

> ReplaceDeduplicateWithAggregate fails when non-grouping keys have duplicate 
> attributes
> --
>
> Key: SPARK-46763
> URL: https://issues.apache.org/jira/browse/SPARK-46763
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 3.5.0
>Reporter: Nikhil Sheoran
>Assignee: Nikhil Sheoran
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch branch-3.4 updated: [SPARK-46763] Fix assertion failure in ReplaceDeduplicateWithAggregate for duplicate attributes

2024-01-23 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 34f81d647552 [SPARK-46763] Fix assertion failure in 
ReplaceDeduplicateWithAggregate for duplicate attributes
34f81d647552 is described below

commit 34f81d647552ea7aceb325b6ddab58cdcb130a48
Author: Nikhil Sheoran <125331115+nikhilsheoran...@users.noreply.github.com>
AuthorDate: Tue Jan 23 17:15:30 2024 +0800

[SPARK-46763] Fix assertion failure in ReplaceDeduplicateWithAggregate for 
duplicate attributes

### What changes were proposed in this pull request?

- Updated the `ReplaceDeduplicateWithAggregate` implementation to reuse 
aliases generated for an attribute.
- Added a unit test to ensure scenarios with duplicate non-grouping keys 
are correctly optimized.

### Why are the changes needed?

- `ReplaceDeduplicateWithAggregate` replaces `Deduplicate` with an 
`Aggregate` operator with grouping expressions for the deduplication keys and 
aggregate expressions for the non-grouping keys (to preserve the output schema 
and keep the non-grouping columns).
- For non-grouping key `a#X`, it generates an aggregate expression of the 
form `first(a#X, false) AS a#Y`
- In case the non-grouping keys have a repeated attribute (with the same 
name and exprId), the existing logic would generate two different aggregate 
expressions both having two different exprId.
- This then leads to duplicate rewrite attributes error (in 
`transformUpWithNewOutput`) when transforming the remaining tree.

- For example, for the query
```
Project [a#0, b#1]
+- Deduplicate [b#1]
   +- Project [a#0, a#0, b#1]
  +- LocalRelation , [a#0, b#1]
```
the existing logic would transform it to
```
Project [a#3, b#1]
+- Aggregate [b#1], [first(a#0, false) AS a#3, first(a#0, false) AS a#5, 
b#1]
   +- Project [a#0, a#0, b#1]
  +- LocalRelation , [a#0, b#1]
```
with the aggregate mapping having two entries `a#0 -> a#3, a#0 -> a#5`.

The correct transformation would be
```
Project [a#3, b#1]
+- Aggregate [b#1], [first(a#0, false) AS a#3, first(a#0, false) AS a#3, 
b#1]
   +- Project [a#0, a#0, b#1]
  +- LocalRelation , [a#0, b#1]
```
with the aggregate mapping having only one entry `a#0 -> a#3`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a unit test in `ResolveOperatorSuite`.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44835 from nikhilsheoran-db/SPARK-46763.

Authored-by: Nikhil Sheoran 
<125331115+nikhilsheoran...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
(cherry picked from commit 715b43428913d6a631f8f9043baac751b88cb5d4)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  6 -
 .../catalyst/optimizer/ReplaceOperatorSuite.scala  | 31 ++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index bc4dbb47c926..7aebf7c28f11 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -2163,11 +2163,15 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput {
 case d @ Deduplicate(keys, child) if !child.isStreaming =>
   val keyExprIds = keys.map(_.exprId)
+  val generatedAliasesMap = new mutable.HashMap[Attribute, Alias]();
   val aggCols = child.output.map { attr =>
 if (keyExprIds.contains(attr.exprId)) {
   attr
 } else {
-  Alias(new First(attr).toAggregateExpression(), attr.name)()
+  // Keep track of the generated aliases to avoid generating multiple 
aliases
+  // for the same attribute (in case the attribute is duplicated)
+  generatedAliasesMap.getOrElseUpdate(attr,
+Alias(new First(attr).toAggregateExpression(), attr.name)())
 }
   }
   // SPARK-22951: Physical aggregate operators distinguishes global 
aggregation and grouping
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index 06fcb12acdd0..c9f0970c9f79 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuit

(spark) branch branch-3.5 updated: [SPARK-46763] Fix assertion failure in ReplaceDeduplicateWithAggregate for duplicate attributes

2024-01-23 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new a559ff7bb9d3 [SPARK-46763] Fix assertion failure in 
ReplaceDeduplicateWithAggregate for duplicate attributes
a559ff7bb9d3 is described below

commit a559ff7bb9d3c34429f80760741f1bbd40696f32
Author: Nikhil Sheoran <125331115+nikhilsheoran...@users.noreply.github.com>
AuthorDate: Tue Jan 23 17:15:30 2024 +0800

[SPARK-46763] Fix assertion failure in ReplaceDeduplicateWithAggregate for 
duplicate attributes

### What changes were proposed in this pull request?

- Updated the `ReplaceDeduplicateWithAggregate` implementation to reuse 
aliases generated for an attribute.
- Added a unit test to ensure scenarios with duplicate non-grouping keys 
are correctly optimized.

### Why are the changes needed?

- `ReplaceDeduplicateWithAggregate` replaces `Deduplicate` with an 
`Aggregate` operator with grouping expressions for the deduplication keys and 
aggregate expressions for the non-grouping keys (to preserve the output schema 
and keep the non-grouping columns).
- For non-grouping key `a#X`, it generates an aggregate expression of the 
form `first(a#X, false) AS a#Y`
- In case the non-grouping keys have a repeated attribute (with the same 
name and exprId), the existing logic would generate two different aggregate 
expressions both having two different exprId.
- This then leads to duplicate rewrite attributes error (in 
`transformUpWithNewOutput`) when transforming the remaining tree.

- For example, for the query
```
Project [a#0, b#1]
+- Deduplicate [b#1]
   +- Project [a#0, a#0, b#1]
  +- LocalRelation , [a#0, b#1]
```
the existing logic would transform it to
```
Project [a#3, b#1]
+- Aggregate [b#1], [first(a#0, false) AS a#3, first(a#0, false) AS a#5, 
b#1]
   +- Project [a#0, a#0, b#1]
  +- LocalRelation , [a#0, b#1]
```
with the aggregate mapping having two entries `a#0 -> a#3, a#0 -> a#5`.

The correct transformation would be
```
Project [a#3, b#1]
+- Aggregate [b#1], [first(a#0, false) AS a#3, first(a#0, false) AS a#3, 
b#1]
   +- Project [a#0, a#0, b#1]
  +- LocalRelation , [a#0, b#1]
```
with the aggregate mapping having only one entry `a#0 -> a#3`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a unit test in `ResolveOperatorSuite`.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44835 from nikhilsheoran-db/SPARK-46763.

Authored-by: Nikhil Sheoran 
<125331115+nikhilsheoran...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
(cherry picked from commit 715b43428913d6a631f8f9043baac751b88cb5d4)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  6 -
 .../catalyst/optimizer/ReplaceOperatorSuite.scala  | 31 ++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index df17840d567e..04d3eb962ed4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -2195,11 +2195,15 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput {
 case d @ Deduplicate(keys, child) if !child.isStreaming =>
   val keyExprIds = keys.map(_.exprId)
+  val generatedAliasesMap = new mutable.HashMap[Attribute, Alias]();
   val aggCols = child.output.map { attr =>
 if (keyExprIds.contains(attr.exprId)) {
   attr
 } else {
-  Alias(new First(attr).toAggregateExpression(), attr.name)()
+  // Keep track of the generated aliases to avoid generating multiple 
aliases
+  // for the same attribute (in case the attribute is duplicated)
+  generatedAliasesMap.getOrElseUpdate(attr,
+Alias(new First(attr).toAggregateExpression(), attr.name)())
 }
   }
   // SPARK-22951: Physical aggregate operators distinguishes global 
aggregation and grouping
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index 5d81e96a8e58..cb9577e050d0 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuit

(spark) branch master updated: [SPARK-46763] Fix assertion failure in ReplaceDeduplicateWithAggregate for duplicate attributes

2024-01-23 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 715b43428913 [SPARK-46763] Fix assertion failure in 
ReplaceDeduplicateWithAggregate for duplicate attributes
715b43428913 is described below

commit 715b43428913d6a631f8f9043baac751b88cb5d4
Author: Nikhil Sheoran <125331115+nikhilsheoran...@users.noreply.github.com>
AuthorDate: Tue Jan 23 17:15:30 2024 +0800

[SPARK-46763] Fix assertion failure in ReplaceDeduplicateWithAggregate for 
duplicate attributes

### What changes were proposed in this pull request?

- Updated the `ReplaceDeduplicateWithAggregate` implementation to reuse 
aliases generated for an attribute.
- Added a unit test to ensure scenarios with duplicate non-grouping keys 
are correctly optimized.

### Why are the changes needed?

- `ReplaceDeduplicateWithAggregate` replaces `Deduplicate` with an 
`Aggregate` operator with grouping expressions for the deduplication keys and 
aggregate expressions for the non-grouping keys (to preserve the output schema 
and keep the non-grouping columns).
- For non-grouping key `a#X`, it generates an aggregate expression of the 
form `first(a#X, false) AS a#Y`
- In case the non-grouping keys have a repeated attribute (with the same 
name and exprId), the existing logic would generate two different aggregate 
expressions both having two different exprId.
- This then leads to duplicate rewrite attributes error (in 
`transformUpWithNewOutput`) when transforming the remaining tree.

- For example, for the query
```
Project [a#0, b#1]
+- Deduplicate [b#1]
   +- Project [a#0, a#0, b#1]
  +- LocalRelation , [a#0, b#1]
```
the existing logic would transform it to
```
Project [a#3, b#1]
+- Aggregate [b#1], [first(a#0, false) AS a#3, first(a#0, false) AS a#5, 
b#1]
   +- Project [a#0, a#0, b#1]
  +- LocalRelation , [a#0, b#1]
```
with the aggregate mapping having two entries `a#0 -> a#3, a#0 -> a#5`.

The correct transformation would be
```
Project [a#3, b#1]
+- Aggregate [b#1], [first(a#0, false) AS a#3, first(a#0, false) AS a#3, 
b#1]
   +- Project [a#0, a#0, b#1]
  +- LocalRelation , [a#0, b#1]
```
with the aggregate mapping having only one entry `a#0 -> a#3`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a unit test in `ResolveOperatorSuite`.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44835 from nikhilsheoran-db/SPARK-46763.

Authored-by: Nikhil Sheoran 
<125331115+nikhilsheoran...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  6 -
 .../catalyst/optimizer/ReplaceOperatorSuite.scala  | 31 ++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 4186c8c1db91..46d3043df3eb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -2192,11 +2192,15 @@ object ReplaceDeduplicateWithAggregate extends 
Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput {
 case d @ Deduplicate(keys, child) if !child.isStreaming =>
   val keyExprIds = keys.map(_.exprId)
+  val generatedAliasesMap = new mutable.HashMap[Attribute, Alias]();
   val aggCols = child.output.map { attr =>
 if (keyExprIds.contains(attr.exprId)) {
   attr
 } else {
-  Alias(new First(attr).toAggregateExpression(), attr.name)()
+  // Keep track of the generated aliases to avoid generating multiple 
aliases
+  // for the same attribute (in case the attribute is duplicated)
+  generatedAliasesMap.getOrElseUpdate(attr,
+Alias(new First(attr).toAggregateExpression(), attr.name)())
 }
   }
   // SPARK-22951: Physical aggregate operators distinguishes global 
aggregation and grouping
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index 5d81e96a8e58..cb9577e050d0 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala

[jira] [Resolved] (SPARK-46590) Coalesce partiton assert error after skew join optimization

2024-01-23 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46590.
-
Fix Version/s: 3.5.1
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 44661
[https://github.com/apache/spark/pull/44661]

> Coalesce partiton assert error after skew join optimization
> ---
>
> Key: SPARK-46590
> URL: https://issues.apache.org/jira/browse/SPARK-46590
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.1, 3.3.3, 3.3.2, 3.4.0, 3.4.1, 3.5.0, 4.0.0, 3.5.1, 
> 3.3.4
>Reporter: Jackey Lee
>Assignee: Jackey Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 3.5.1, 4.0.0
>
> Attachments: problem.log
>
>
> Recently when we were testing TPCDS Q71, we found that if 
> `spark.sql.shuffle.partitions` and 
> `spark.sql.adaptive.coalescePartitions.initialPartitionNum` are both set to 
> the number of executor cores, an AssertError may be reported in 
> coalescePartition due to the partitionSpecs of joins after skew are different.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46590) Coalesce partiton assert error after skew join optimization

2024-01-23 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46590:
---

Assignee: Jackey Lee

> Coalesce partiton assert error after skew join optimization
> ---
>
> Key: SPARK-46590
> URL: https://issues.apache.org/jira/browse/SPARK-46590
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.1, 3.3.3, 3.3.2, 3.4.0, 3.4.1, 3.5.0, 4.0.0, 3.5.1, 
> 3.3.4
>Reporter: Jackey Lee
>Assignee: Jackey Lee
>Priority: Major
>  Labels: pull-request-available
> Attachments: problem.log
>
>
> Recently when we were testing TPCDS Q71, we found that if 
> `spark.sql.shuffle.partitions` and 
> `spark.sql.adaptive.coalescePartitions.initialPartitionNum` are both set to 
> the number of executor cores, an AssertError may be reported in 
> coalescePartition due to the partitionSpecs of joins after skew are different.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces

2024-01-23 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 6403a84b6854 [SPARK-46590][SQL] Fix coalesce failed with unexpected 
partition indeces
6403a84b6854 is described below

commit 6403a84b6854214a4ed7d5c0c800e877e0748964
Author: jackylee-ch 
AuthorDate: Tue Jan 23 16:10:37 2024 +0800

[SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces

### What changes were proposed in this pull request?
As outlined in JIRA issue 
[SPARK-46590](https://issues.apache.org/jira/browse/SPARK-46590), when a 
broadcast join follows a union within the same stage, the 
[collectCoalesceGroups](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala#L144)
 method will indiscriminately traverse all sub-plans, aggregating them into a 
single group, which is not expected.

### Why are the changes needed?
In fact, for broadcastjoin, we do not expect broadcast exchange has same 
partition number. Therefore, we can safely disregard the broadcast join and 
continue traversing the subplan.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Newly added unit test. It would fail without this pr.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44661 from 
jackylee-ch/fix_coalesce_problem_with_broadcastjoin_and_union.

Authored-by: jackylee-ch 
Signed-off-by: Wenchen Fan 
(cherry picked from commit de0c4ad3947f1188f02aaa612df8278d1c7c3ce5)
Signed-off-by: Wenchen Fan 
---
 .../adaptive/CoalesceShufflePartitions.scala   | 10 ++--
 .../execution/adaptive/ShufflePartitionsUtil.scala |  6 ++-
 .../execution/CoalesceShufflePartitionsSuite.scala | 61 ++
 .../sql/execution/ShufflePartitionsUtilSuite.scala | 31 +--
 4 files changed, 86 insertions(+), 22 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 34399001c726..26e5ac649dbb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan, 
UnaryExecNode, UnionExec}
 import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, 
ShuffleExchangeLike, ShuffleOrigin}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, CartesianProductExec}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.Utils
 
@@ -146,13 +147,16 @@ case class CoalesceShufflePartitions(session: 
SparkSession) extends AQEShuffleRe
   Seq(collectShuffleStageInfos(r))
 case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
 case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-// If not all leaf nodes are exchange query stages, it's not safe to 
reduce the number of
-// shuffle partitions, because we may break the assumption that all 
children of a spark plan
-// have same number of output partitions.
+case join: CartesianProductExec => 
join.children.flatMap(collectCoalesceGroups)
 // Note that, `BroadcastQueryStageExec` is a valid case:
 // If a join has been optimized from shuffled join to broadcast join, then 
the one side is
 // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It 
can coalesce the
 // shuffle side as we do not expect broadcast exchange has same partition 
number.
+case join: BroadcastHashJoinExec => 
join.children.flatMap(collectCoalesceGroups)
+case join: BroadcastNestedLoopJoinExec => 
join.children.flatMap(collectCoalesceGroups)
+// If not all leaf nodes are exchange query stages, it's not safe to 
reduce the number of
+// shuffle partitions, because we may break the assumption that all 
children of a spark plan
+// have same number of output partitions.
 case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) 
=>
   val shuffleStages = collectShuffleStageInfos(p)
   // ShuffleExchanges introduced by repartition do not support partition 
number change.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/mai

(spark) branch master updated: [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces

2024-01-23 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new de0c4ad3947f [SPARK-46590][SQL] Fix coalesce failed with unexpected 
partition indeces
de0c4ad3947f is described below

commit de0c4ad3947f1188f02aaa612df8278d1c7c3ce5
Author: jackylee-ch 
AuthorDate: Tue Jan 23 16:10:37 2024 +0800

[SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces

### What changes were proposed in this pull request?
As outlined in JIRA issue 
[SPARK-46590](https://issues.apache.org/jira/browse/SPARK-46590), when a 
broadcast join follows a union within the same stage, the 
[collectCoalesceGroups](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala#L144)
 method will indiscriminately traverse all sub-plans, aggregating them into a 
single group, which is not expected.

### Why are the changes needed?
In fact, for broadcastjoin, we do not expect broadcast exchange has same 
partition number. Therefore, we can safely disregard the broadcast join and 
continue traversing the subplan.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Newly added unit test. It would fail without this pr.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44661 from 
jackylee-ch/fix_coalesce_problem_with_broadcastjoin_and_union.

Authored-by: jackylee-ch 
Signed-off-by: Wenchen Fan 
---
 .../adaptive/CoalesceShufflePartitions.scala   | 10 ++--
 .../execution/adaptive/ShufflePartitionsUtil.scala |  6 ++-
 .../execution/CoalesceShufflePartitionsSuite.scala | 61 ++
 .../sql/execution/ShufflePartitionsUtilSuite.scala | 31 +--
 4 files changed, 86 insertions(+), 22 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 34399001c726..26e5ac649dbb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan, 
UnaryExecNode, UnionExec}
 import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, 
ShuffleExchangeLike, ShuffleOrigin}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, CartesianProductExec}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.Utils
 
@@ -146,13 +147,16 @@ case class CoalesceShufflePartitions(session: 
SparkSession) extends AQEShuffleRe
   Seq(collectShuffleStageInfos(r))
 case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
 case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-// If not all leaf nodes are exchange query stages, it's not safe to 
reduce the number of
-// shuffle partitions, because we may break the assumption that all 
children of a spark plan
-// have same number of output partitions.
+case join: CartesianProductExec => 
join.children.flatMap(collectCoalesceGroups)
 // Note that, `BroadcastQueryStageExec` is a valid case:
 // If a join has been optimized from shuffled join to broadcast join, then 
the one side is
 // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It 
can coalesce the
 // shuffle side as we do not expect broadcast exchange has same partition 
number.
+case join: BroadcastHashJoinExec => 
join.children.flatMap(collectCoalesceGroups)
+case join: BroadcastNestedLoopJoinExec => 
join.children.flatMap(collectCoalesceGroups)
+// If not all leaf nodes are exchange query stages, it's not safe to 
reduce the number of
+// shuffle partitions, because we may break the assumption that all 
children of a spark plan
+// have same number of output partitions.
 case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) 
=>
   val shuffleStages = collectShuffleStageInfos(p)
   // ShuffleExchanges introduced by repartition do not support partition 
number change.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index dbed66683b01..9370b3d8d1d7 1006

(spark) branch master updated: Revert "[SPARK-46219][SQL] Unwrap cast in join predicates"

2024-01-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 498519ee6bb4 Revert "[SPARK-46219][SQL] Unwrap cast in join predicates"
498519ee6bb4 is described below

commit 498519ee6bb4b0295d1df005175e4cbcbcb051e3
Author: Wenchen Fan 
AuthorDate: Mon Jan 22 15:30:31 2024 +0800

Revert "[SPARK-46219][SQL] Unwrap cast in join predicates"

This reverts commit 8235f1d56bf232bb713fe24ff6f2ffdaf49d2fcc.
---
 .../org/apache/spark/sql/internal/SQLConf.scala|  10 --
 .../bucketing/CoalesceBucketsInJoin.scala  |  22 +++-
 .../execution/exchange/EnsureRequirements.scala|  25 +
 ...ractJoinWithUnwrappedCastInJoinPredicates.scala | 114 -
 .../spark/sql/execution/joins/ShuffledJoin.scala   |  14 +--
 .../apache/spark/sql/execution/PlannerSuite.scala  |  74 -
 .../spark/sql/sources/BucketedReadSuite.scala  |  65 
 7 files changed, 23 insertions(+), 301 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3dd7cf884cbe..bc4734775c77 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -564,14 +564,6 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
-  val UNWRAP_CAST_IN_JOIN_CONDITION_ENABLED =
-buildConf("spark.sql.unwrapCastInJoinCondition.enabled")
-  .doc("When true, unwrap the cast in the join condition to reduce shuffle 
if they are " +
-"integral types.")
-  .version("4.0.0")
-  .booleanConf
-  .createWithDefault(true)
-
   val MAX_SINGLE_PARTITION_BYTES = 
buildConf("spark.sql.maxSinglePartitionBytes")
 .doc("The maximum number of bytes allowed for a single partition. 
Otherwise, The planner " +
   "will introduce shuffle to improve parallelism.")
@@ -5126,8 +5118,6 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
 
-  def unwrapCastInJoinConditionEnabled: Boolean = 
getConf(UNWRAP_CAST_IN_JOIN_CONDITION_ENABLED)
-
   def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
 
   def isParquetSchemaMergingEnabled: Boolean = 
getConf(PARQUET_SCHEMA_MERGING_ENABLED)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
index ab0eaa044dea..d1464b4ac4ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
@@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.bucketing
 import scala.annotation.tailrec
 
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, PartitioningCollection}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, ShuffledJoin, 
SortMergeJoinExec}
@@ -129,11 +131,27 @@ object ExtractJoinWithBuckets {
 }
   }
 
+  /**
+   * The join keys should match with expressions for output partitioning. Note 
that
+   * the ordering does not matter because it will be handled in 
`EnsureRequirements`.
+   */
+  private def satisfiesOutputPartitioning(
+  keys: Seq[Expression],
+  partitioning: Partitioning): Boolean = {
+partitioning match {
+  case HashPartitioning(exprs, _) if exprs.length == keys.length =>
+exprs.forall(e => keys.exists(_.semanticEquals(e)))
+  case PartitioningCollection(partitionings) =>
+partitionings.exists(satisfiesOutputPartitioning(keys, _))
+  case _ => false
+}
+  }
+
   private def isApplicable(j: ShuffledJoin): Boolean = {
 hasScanOperation(j.left) &&
   hasScanOperation(j.right) &&
-  j.satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) &&
-  j.satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning)
+  satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) &&
+  satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning)
   }
 
   private def isDi

[jira] [Updated] (SPARK-46769) Refine timestamp related schema inference

2024-01-20 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan updated SPARK-46769:

Summary: Refine timestamp related schema inference  (was: Fix inferring of 
TIMESTAMP_NTZ in CSV/JSON)

> Refine timestamp related schema inference
> -
>
> Key: SPARK-46769
> URL: https://issues.apache.org/jira/browse/SPARK-46769
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Max Gekk
>Assignee: Max Gekk
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.1
>
>
> After the PR https://github.com/apache/spark/pull/43243, the TIMESTAMP_NTZ 
> type inference in CSV/JSON datasource got 2 new guards which means 
> TIMESTAMP_NTZ should be inferred either if:
> 1. the SQL config `spark.sql.legacy.timeParserPolicy` is set to `LEGACY` or
> 2. `spark.sql.timestampType` is set to `TIMESTAMP_NTZ`.
> otherwise CSV/JSON should try to infer `TIMESTAMP_LTZ`.
> Both guards are unnecessary because:
> 1. when `spark.sql.legacy.timeParserPolicy` is `LEGACY` that only means Spark 
> should use a legacy Java 7- parser: `FastDateFormat` or `SimpleDateFormat`. 
> Both parser are applicable for parsing `TIMESTAMP_NTZ`.
> 2. when `spark.sql.timestampType` is set to `TIMESTAMP_LTZ`, it doesn't mean 
> that we should skip inferring of `TIMESTAMP_NTZ` types in CSV/JSON, and try 
> to parse the timestamp string value w/o time zone like 
> `2024-01-19T09:10:11.123` using a LTZ format **with timezone** like 
> `-MM-dd'T'HH:mm:ss.SSSXXX`. _The last one cannot match any NTZ values for 
> sure._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46769) Refine timestamp related schema inference

2024-01-20 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan updated SPARK-46769:

Description: (was: After the PR 
https://github.com/apache/spark/pull/43243, the TIMESTAMP_NTZ type inference in 
CSV/JSON datasource got 2 new guards which means TIMESTAMP_NTZ should be 
inferred either if:

1. the SQL config `spark.sql.legacy.timeParserPolicy` is set to `LEGACY` or
2. `spark.sql.timestampType` is set to `TIMESTAMP_NTZ`.

otherwise CSV/JSON should try to infer `TIMESTAMP_LTZ`.

Both guards are unnecessary because:

1. when `spark.sql.legacy.timeParserPolicy` is `LEGACY` that only means Spark 
should use a legacy Java 7- parser: `FastDateFormat` or `SimpleDateFormat`. 
Both parser are applicable for parsing `TIMESTAMP_NTZ`.
2. when `spark.sql.timestampType` is set to `TIMESTAMP_LTZ`, it doesn't mean 
that we should skip inferring of `TIMESTAMP_NTZ` types in CSV/JSON, and try to 
parse the timestamp string value w/o time zone like `2024-01-19T09:10:11.123` 
using a LTZ format **with timezone** like `-MM-dd'T'HH:mm:ss.SSSXXX`. _The 
last one cannot match any NTZ values for sure._)

> Refine timestamp related schema inference
> -
>
> Key: SPARK-46769
> URL: https://issues.apache.org/jira/browse/SPARK-46769
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Max Gekk
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46769) Refine timestamp related schema inference

2024-01-20 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46769:
---

Assignee: Wenchen Fan  (was: Max Gekk)

> Refine timestamp related schema inference
> -
>
> Key: SPARK-46769
> URL: https://issues.apache.org/jira/browse/SPARK-46769
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Max Gekk
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.1
>
>
> After the PR https://github.com/apache/spark/pull/43243, the TIMESTAMP_NTZ 
> type inference in CSV/JSON datasource got 2 new guards which means 
> TIMESTAMP_NTZ should be inferred either if:
> 1. the SQL config `spark.sql.legacy.timeParserPolicy` is set to `LEGACY` or
> 2. `spark.sql.timestampType` is set to `TIMESTAMP_NTZ`.
> otherwise CSV/JSON should try to infer `TIMESTAMP_LTZ`.
> Both guards are unnecessary because:
> 1. when `spark.sql.legacy.timeParserPolicy` is `LEGACY` that only means Spark 
> should use a legacy Java 7- parser: `FastDateFormat` or `SimpleDateFormat`. 
> Both parser are applicable for parsing `TIMESTAMP_NTZ`.
> 2. when `spark.sql.timestampType` is set to `TIMESTAMP_LTZ`, it doesn't mean 
> that we should skip inferring of `TIMESTAMP_NTZ` types in CSV/JSON, and try 
> to parse the timestamp string value w/o time zone like 
> `2024-01-19T09:10:11.123` using a LTZ format **with timezone** like 
> `-MM-dd'T'HH:mm:ss.SSSXXX`. _The last one cannot match any NTZ values for 
> sure._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46769) Fix inferring of TIMESTAMP_NTZ in CSV/JSON

2024-01-20 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46769.
-
Fix Version/s: 3.5.1
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 44800
[https://github.com/apache/spark/pull/44800]

> Fix inferring of TIMESTAMP_NTZ in CSV/JSON
> --
>
> Key: SPARK-46769
> URL: https://issues.apache.org/jira/browse/SPARK-46769
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Max Gekk
>Assignee: Max Gekk
>Priority: Major
>  Labels: pull-request-available
> Fix For: 3.5.1, 4.0.0
>
>
> After the PR https://github.com/apache/spark/pull/43243, the TIMESTAMP_NTZ 
> type inference in CSV/JSON datasource got 2 new guards which means 
> TIMESTAMP_NTZ should be inferred either if:
> 1. the SQL config `spark.sql.legacy.timeParserPolicy` is set to `LEGACY` or
> 2. `spark.sql.timestampType` is set to `TIMESTAMP_NTZ`.
> otherwise CSV/JSON should try to infer `TIMESTAMP_LTZ`.
> Both guards are unnecessary because:
> 1. when `spark.sql.legacy.timeParserPolicy` is `LEGACY` that only means Spark 
> should use a legacy Java 7- parser: `FastDateFormat` or `SimpleDateFormat`. 
> Both parser are applicable for parsing `TIMESTAMP_NTZ`.
> 2. when `spark.sql.timestampType` is set to `TIMESTAMP_LTZ`, it doesn't mean 
> that we should skip inferring of `TIMESTAMP_NTZ` types in CSV/JSON, and try 
> to parse the timestamp string value w/o time zone like 
> `2024-01-19T09:10:11.123` using a LTZ format **with timezone** like 
> `-MM-dd'T'HH:mm:ss.SSSXXX`. _The last one cannot match any NTZ values for 
> sure._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-46769][SQL] Refine timestamp related schema inference

2024-01-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new c19bf01b5208 [SPARK-46769][SQL] Refine timestamp related schema 
inference
c19bf01b5208 is described below

commit c19bf01b5208bb3aad0e6264b64597e0809b1efe
Author: Wenchen Fan 
AuthorDate: Sat Jan 20 20:57:09 2024 +0800

[SPARK-46769][SQL] Refine timestamp related schema inference

This is a refinement of https://github.com/apache/spark/pull/43243 . This 
PR enforces one thing: we only infer TIMESTAMP NTZ type using NTZ parser, and 
only infer LTZ type using LTZ parser. This consistency is important to avoid 
nondeterministic behaviors.

Avoid non-deterministic behaviors. After 
https://github.com/apache/spark/pull/43243 , we can still have inconsistency if 
the LEGACY mode is enabled.

Yes for the legacy parser. Now it's more likely to infer string type 
instead of inferring timestamp type "by luck"

existing tests

no

Closes https://github.com/apache/spark/pull/44789

Closes #44800 from cloud-fan/infer.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit e4e40762ca41931646b8f201028b1f2298252d96)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/csv/CSVInferSchema.scala| 18 +-
 .../spark/sql/catalyst/json/JsonInferSchema.scala  | 32 +
 .../sql/execution/datasources/csv/CSVSuite.scala   | 42 +++---
 3 files changed, 55 insertions(+), 37 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index ec01b56f9eb7..2c27da3cf6e1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
 import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 class CSVInferSchema(val options: CSVOptions) extends Serializable {
@@ -66,6 +66,8 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
   private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set(
 "-MM-dd", "-M-d", "-M-dd", "-MM-d", "-MM", "-M", 
"")
 
+  private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType
+
   /**
* Similar to the JSON schema inference
* 1. Infer type of each row
@@ -199,14 +201,12 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
   }
 
   private def tryParseTimestampNTZ(field: String): DataType = {
-// We can only parse the value as TimestampNTZType if it does not have 
zone-offset or
-// time-zone component and can be parsed with the timestamp formatter.
-// Otherwise, it is likely to be a timestamp with timezone.
-val timestampType = SQLConf.get.timestampType
-if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
-timestampType == TimestampNTZType) &&
-timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
-  timestampType
+// For text-based format, it's ambiguous to infer a timestamp string 
without timezone, as it can
+// be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new 
support of NTZ, here
+// we only try to infer NTZ if the config is set to use NTZ by default.
+if (isDefaultNTZ &&
+  timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
+  TimestampNTZType
 } else {
   tryParseTimestamp(field)
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 4123c5290b6a..f6d32f39f64e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -34,6 +34,7 @@ import 
org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 import

(spark) branch master updated: [SPARK-46769][SQL] Refine timestamp related schema inference

2024-01-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e4e40762ca41 [SPARK-46769][SQL] Refine timestamp related schema 
inference
e4e40762ca41 is described below

commit e4e40762ca41931646b8f201028b1f2298252d96
Author: Wenchen Fan 
AuthorDate: Sat Jan 20 20:57:09 2024 +0800

[SPARK-46769][SQL] Refine timestamp related schema inference

### What changes were proposed in this pull request?

This is a refinement of https://github.com/apache/spark/pull/43243 . This 
PR enforces one thing: we only infer TIMESTAMP NTZ type using NTZ parser, and 
only infer LTZ type using LTZ parser. This consistency is important to avoid 
nondeterministic behaviors.

### Why are the changes needed?

Avoid non-deterministic behaviors. After 
https://github.com/apache/spark/pull/43243 , we can still have inconsistency if 
the LEGACY mode is enabled.

### Does this PR introduce _any_ user-facing change?

Yes for the legacy parser. Now it's more likely to infer string type 
instead of inferring timestamp type "by luck"

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes https://github.com/apache/spark/pull/44789

Closes #44800 from cloud-fan/infer.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/csv/CSVInferSchema.scala| 18 +-
 .../spark/sql/catalyst/json/JsonInferSchema.scala  | 31 
 .../sql/execution/datasources/csv/CSVSuite.scala   | 42 +++---
 3 files changed, 54 insertions(+), 37 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index ec01b56f9eb7..2c27da3cf6e1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
 import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 class CSVInferSchema(val options: CSVOptions) extends Serializable {
@@ -66,6 +66,8 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
   private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set(
 "-MM-dd", "-M-d", "-M-dd", "-MM-d", "-MM", "-M", 
"")
 
+  private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType
+
   /**
* Similar to the JSON schema inference
* 1. Infer type of each row
@@ -199,14 +201,12 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
   }
 
   private def tryParseTimestampNTZ(field: String): DataType = {
-// We can only parse the value as TimestampNTZType if it does not have 
zone-offset or
-// time-zone component and can be parsed with the timestamp formatter.
-// Otherwise, it is likely to be a timestamp with timezone.
-val timestampType = SQLConf.get.timestampType
-if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
-timestampType == TimestampNTZType) &&
-timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
-  timestampType
+// For text-based format, it's ambiguous to infer a timestamp string 
without timezone, as it can
+// be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new 
support of NTZ, here
+// we only try to infer NTZ if the config is set to use NTZ by default.
+if (isDefaultNTZ &&
+  timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
+  TimestampNTZType
 } else {
   tryParseTimestamp(field)
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index cfc9e5520e53..bc7038fc71d4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -35,6 +35,7 @@ import 
org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.errors.Qu

(spark) branch master updated: [SPARK-45827] Disallow partitioning on Variant column

2024-01-19 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 689ab0ee8ca8 [SPARK-45827] Disallow partitioning on Variant column
689ab0ee8ca8 is described below

commit 689ab0ee8ca87286db6167f672348e73116b9186
Author: cashmand 
AuthorDate: Sat Jan 20 10:45:51 2024 +0800

[SPARK-45827] Disallow partitioning on Variant column

### What changes were proposed in this pull request?

Follow-up to https://github.com/apache/spark/pull/43984: we should not 
allow partitioning on VariantType. Even though it is is an atomic type, it 
represents a nested semi-structured value, so not partitioning is consistent 
with our decision to not allow partitioning on nested types. Also, for now at 
least, it is not even comparable, so attempting to partition fails with a 
confusing codegen error about a method named `compare` not being declared.

### Why are the changes needed?

Improves error message when attempting to partition on Variant, and 
explicitly forbids a case that we do not intend to support.

### Does this PR introduce _any_ user-facing change?

Improved error message if a user tries to partition on Variant.

### How was this patch tested?

Added unit test, which fails without the code change.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44742 from cashmand/SPARK-45827-no-partitioning.

Authored-by: cashmand 
Signed-off-by: Wenchen Fan 
---
 .../execution/datasources/PartitioningUtils.scala  |  2 +-
 .../spark/sql/execution/datasources/rules.scala|  5 +-
 .../scala/org/apache/spark/sql/VariantSuite.scala  | 56 ++
 3 files changed, 60 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 9905e9af9b0b..555099da221e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -564,7 +564,7 @@ object PartitioningUtils extends SQLConfHelper {
 
 partitionColumnsSchema(schema, partitionColumns).foreach {
   field => field.dataType match {
-case _: AtomicType => // OK
+case a: AtomicType if !a.isInstanceOf[VariantType] => // OK
 case _ => throw 
QueryCompilationErrors.cannotUseDataTypeForPartitionColumnError(field)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index c58815b6978e..bb2bad7a6867 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable => 
CreateTableV1}
 import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
 import org.apache.spark.sql.sources.InsertableRelation
-import org.apache.spark.sql.types.{AtomicType, StructType}
+import org.apache.spark.sql.types.{AtomicType, StructType, VariantType}
 import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
 import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.util.ArrayImplicits._
@@ -330,7 +330,8 @@ case class PreprocessTableCreation(catalog: SessionCatalog) 
extends Rule[Logical
 }
 
 schema.filter(f => 
normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach {
-  case _: AtomicType => // OK
+  // VariantType values are not comparable, so can't be used as partition 
columns.
+  case a: AtomicType if !a.isInstanceOf[VariantType] => // OK
   case other => failAnalysis(s"Cannot use ${other.catalogString} for 
partition column")
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
index 98d106f05f0c..af37445c1323 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
@@ -138,4 +138,60 @@ class VariantSuite extends QueryTest with 
SharedSparkSession {
   }
 }
   }
+
+  test("write partitioned file") {
+def verifyResult(df: DataFrame): Unit = {
+  val result = df.selectExpr("v").collect()
+.map(_.get(0).asInstanceOf[VariantVal].toString)
+.sorted
+.toSeq
+  val expected = (1 until

(spark) branch master updated: [SPARK-46707][SQL] Added throwable field to expressions to improve predicate pushdown

2024-01-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 62956c92cfc7 [SPARK-46707][SQL] Added throwable field to expressions 
to improve predicate pushdown
62956c92cfc7 is described below

commit 62956c92cfc74d7523328d168b6d837938cde763
Author: Kelvin Jiang 
AuthorDate: Thu Jan 18 19:25:24 2024 +0800

[SPARK-46707][SQL] Added throwable field to expressions to improve 
predicate pushdown

### What changes were proposed in this pull request?

This PR adds the field `throwable` to `Expression`. If an expression is 
marked as throwable, we will avoid pushing filters containing these expressions 
through joins, filters, and aggregations (i.e. operators that filter input).

### Why are the changes needed?

For predicate pushdown, currently it is possible that we push down a filter 
that ends up being evaluated on more rows than before it was pushed down (e.g. 
if we push the filter through a selective join). In this case, it is possible 
that we now evaluate the filter on a row that will cause a runtime error to be 
thrown, when prior to pushing this would not have happened.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44716 from kelvinjian-db/SPARK-46707-throwable.

Authored-by: Kelvin Jiang 
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/expressions/Expression.scala  |  5 ++
 .../expressions/collectionOperations.scala |  3 ++
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 27 +-
 .../catalyst/optimizer/FilterPushdownSuite.scala   | 63 ++
 4 files changed, 84 insertions(+), 14 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 2cc813bd3055..484418f5e5a7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -140,6 +140,11 @@ abstract class Expression extends TreeNode[Expression] {
*/
   def stateful: Boolean = false
 
+  /**
+   * Returns true if the expression could potentially throw an exception when 
evaluated.
+   */
+  lazy val throwable: Boolean = children.exists(_.throwable)
+
   /**
* Returns a copy of this expression where all stateful expressions are 
replaced with fresh
* uninitialized copies. If the expression contains no stateful expressions 
then the original
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 04f56eaf8c1e..5aa96dd1a6aa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -2983,6 +2983,9 @@ case class Sequence(
 
   override def nullable: Boolean = children.exists(_.nullable)
 
+  // If step is defined, then an error will be thrown if the start and stop do 
not satisfy the step.
+  override lazy val throwable: Boolean = stepOpt.isDefined
+
   override def dataType: ArrayType = ArrayType(start.dataType, containsNull = 
false)
 
   override def checkInputDataTypes(): TypeCheckResult = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 8fcc7c7c26b4..4186c8c1db91 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1549,10 +1549,11 @@ object CombineFilters extends Rule[LogicalPlan] with 
PredicateHelper {
 
   val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
 // The query execution/optimization does not guarantee the expressions are 
evaluated in order.
-// We only can combine them if and only if both are deterministic.
+// We only can combine them if and only if both are deterministic and the 
outer condition is not
+// throwable (inner can be throwable as it was going to be evaluated first 
anyways).
 case Filter(fc, nf @ Filter(nc, grandChild)) if nc.deterministic =>
-  val (combineCandidates, nonDeterministic) =
-splitConjunctivePredicates(fc).partition(_.deterministic)
+  val (combineCandidates, r

[jira] [Resolved] (SPARK-46644) Fix add in SQLMetric

2024-01-17 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46644.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44649
[https://github.com/apache/spark/pull/44649]

> Fix add in SQLMetric
> 
>
> Key: SPARK-46644
> URL: https://issues.apache.org/jira/browse/SPARK-46644
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Davin Tjong
>Assignee: Davin Tjong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> A previous refactor mistakenly used `isValid` for add. Since 
> `defaultValidValue` was always `0`, this didn't cause any correctness issues.
> What we really want to do for add (and merge) is `if (isZero) _value = 0`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-46644] Change add and merge in SQLMetric to use isZero

2024-01-17 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 89727bfa7529 [SPARK-46644] Change add and merge in SQLMetric to use 
isZero
89727bfa7529 is described below

commit 89727bfa7529aa28d85e5d9a58b21d8aa035a23f
Author: Davin Tjong 
AuthorDate: Thu Jan 18 11:01:37 2024 +0800

[SPARK-46644] Change add and merge in SQLMetric to use isZero

### What changes were proposed in this pull request?

A previous refactor mistakenly used `isValid` for add. Since 
`defaultValidValue` was always `0`, this didn't cause any correctness issues.

What we really want to do for add (and merge) is `if (isZero) _value = 0`.

Also removing `isValid` since its redundant, if `defaultValidValue` is 
always `0`.

### Why are the changes needed?

There are no correctness errors, but this is confusing and error-prone.

A negative `defaultValidValue` was intended to allow creating optional 
metrics. With the previous behavior this would incorrectly add the sentinel 
value. `defaultValidValue` is supposed to determine what value is exposed to 
the user.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Running the tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44649 from davintjong-db/sql-metric-add-fix.

Authored-by: Davin Tjong 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/execution/metric/SQLMetrics.scala| 50 --
 .../sql/execution/metric/SQLMetricsSuite.scala | 11 +++--
 2 files changed, 32 insertions(+), 29 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 8cd28f9a06a4..a246b47fe655 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -39,21 +39,21 @@ import org.apache.spark.util.AccumulatorContext.internOption
  */
 class SQLMetric(
 val metricType: String,
-initValue: Long = 0L,
-defaultValidValue: Long = 0L) extends AccumulatorV2[Long, Long] {
-  // initValue defines the initial value of the metric. defaultValidValue 
defines the lowest value
-  // considered valid. If a SQLMetric is invalid, it is set to 
defaultValidValue upon receiving any
-  // updates, and it also reports defaultValidValue as its value to avoid 
exposing it to the user
-  // programatically.
+initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
+  // initValue defines the initial value of the metric. 0 is the lowest value 
considered valid.
+  // If a SQLMetric is invalid, it is set to 0 upon receiving any updates, and 
it also reports
+  // 0 as its value to avoid exposing it to the user programmatically.
   //
-  // For many SQLMetrics, we use initValue = -1 and defaultValidValue = 0 to 
indicate that the
-  // metric is by default invalid. At the end of a task, we will update the 
metric making it valid,
-  // and the invalid metrics will be filtered out when calculating min, max, 
etc. as a workaround
+  // For many SQLMetrics, we use initValue = -1 to indicate that the metric is 
by default invalid.
+  // At the end of a task, we will update the metric making it valid, and the 
invalid metrics will
+  // be filtered out when calculating min, max, etc. as a workaround
   // for SPARK-11013.
+  assert(initValue <= 0)
+  // _value will always be either initValue or non-negative.
   private var _value = initValue
 
   override def copy(): SQLMetric = {
-val newAcc = new SQLMetric(metricType, initValue, defaultValidValue)
+val newAcc = new SQLMetric(metricType, initValue)
 newAcc._value = _value
 newAcc
   }
@@ -62,8 +62,8 @@ class SQLMetric(
 
   override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
 case o: SQLMetric =>
-  if (o.isValid) {
-if (!isValid) _value = defaultValidValue
+  if (!o.isZero) {
+if (isZero) _value = 0
 _value += o.value
   }
 case _ => throw QueryExecutionErrors.cannotMergeClassWithOtherClassError(
@@ -73,28 +73,32 @@ class SQLMetric(
   // This is used to filter out metrics. Metrics with value equal to initValue 
should
   // be filtered out, since they are either invalid or safe to filter without 
changing
   // the aggregation defined in [[SQLMetrics.stringValue]].
-  // Note that we don't use defaultValidValue here since we may want to collect
-  // defaultValidValue metrics for calculating min, max, etc. See SPARK-11013.
+  // Note that we don't use 0 here since we may want to collect 0 metrics for
+  // calculating min, max,

[jira] [Assigned] (SPARK-46274) Range operator computeStats() proper long conversions

2024-01-14 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46274:
---

Assignee: Nick Young  (was: Kelvin Jiang)

> Range operator computeStats() proper long conversions
> -
>
> Key: SPARK-46274
> URL: https://issues.apache.org/jira/browse/SPARK-46274
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Kelvin Jiang
>Assignee: Nick Young
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.1
>
>
> Range operator's `computeStats()` function unsafely casts from `BigInt` to 
> `Long` and causes issues downstream with statistics estimation. Adds bounds 
> checking to avoid crashing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-45435) Document that lazy checkpoint may not be a consistent

2024-01-14 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-45435:
---

Assignee: Juliusz Sompolski

> Document that lazy checkpoint may not be a consistent
> -
>
> Key: SPARK-45435
> URL: https://issues.apache.org/jira/browse/SPARK-45435
> Project: Spark
>  Issue Type: Documentation
>  Components: Spark Core, SQL
>Affects Versions: 4.0.0
>Reporter: Juliusz Sompolski
>Assignee: Juliusz Sompolski
>Priority: Major
>  Labels: pull-request-available
>
> Some people may want to use checkpoint to get a consistent snapshot of the 
> Dataset / RDD. Warn that this is not the case with lazy checkpoint, because 
> checkpoint is computed only at the end of the first action, and the data used 
> during the first action may be different because of non-determinism and 
> retries.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-45435) Document that lazy checkpoint may not be a consistent

2024-01-14 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-45435.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 43247
[https://github.com/apache/spark/pull/43247]

> Document that lazy checkpoint may not be a consistent
> -
>
> Key: SPARK-45435
> URL: https://issues.apache.org/jira/browse/SPARK-45435
> Project: Spark
>  Issue Type: Documentation
>  Components: Spark Core, SQL
>Affects Versions: 4.0.0
>Reporter: Juliusz Sompolski
>Assignee: Juliusz Sompolski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Some people may want to use checkpoint to get a consistent snapshot of the 
> Dataset / RDD. Warn that this is not the case with lazy checkpoint, because 
> checkpoint is computed only at the end of the first action, and the data used 
> during the first action may be different because of non-determinism and 
> retries.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-45435][DOC] Document that lazy checkpoint may not be a consistent snapshot

2024-01-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 65d822c44d93 [SPARK-45435][DOC] Document that lazy checkpoint may not 
be a consistent snapshot
65d822c44d93 is described below

commit 65d822c44d93077b68d8738c261fcaf9288cc960
Author: Juliusz Sompolski 
AuthorDate: Mon Jan 15 14:55:39 2024 +0800

[SPARK-45435][DOC] Document that lazy checkpoint may not be a consistent 
snapshot

### What changes were proposed in this pull request?

Some may want to use checkpoint to get a consistent snapshot of the Dataset 
/ RDD. Warn that this is not the case with lazy checkpoint, because checkpoint 
is computed only at the end of the first action, and the data used during the 
first action may be different because of non-determinism and retries.

`doCheckpoint` is only called at the end of 
[SparkContext.runJob](https://github.com/apache/spark/blob/5446f548bbc8a93414f1c773a8daf714b57b7d1a/core/src/main/scala/org/apache/spark/SparkContext.scala#L2426).
 This may cause recomputation both of data of [local checkpoint 
data](https://github.com/apache/spark/blob/5446f548bbc8a93414f1c773a8daf714b57b7d1a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala#L54)
 and [reliable checkpoint data](https://github.com/apache/sp [...]

### Why are the changes needed?

Document a gnarly edge case.

### Does this PR introduce _any_ user-facing change?

Yes, change to documentation of public APIs.

### How was this patch tested?

Doc only change.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43247 from juliuszsompolski/SPARK-45435-doc.

Authored-by: Juliusz Sompolski 
Signed-off-by: Wenchen Fan 
---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala   | 14 ++
 .../src/main/scala/org/apache/spark/sql/Dataset.scala| 16 
 2 files changed, 30 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 9518433a7f69..d73fb1b9bc3b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1649,6 +1649,13 @@ abstract class RDD[T: ClassTag](
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is 
persisted in
* memory, otherwise saving it on a file will require recomputation.
+   *
+   * The data is only checkpointed when `doCheckpoint()` is called, and this 
only happens at the
+   * end of the first action execution on this RDD. The final data that is 
checkpointed after the
+   * first action may be different from the data that was used during the 
action, due to
+   * non-determinism of the underlying operation and retries. If the purpose 
of the checkpoint is
+   * to achieve saving a deterministic snapshot of the data, an eager action 
may need to be called
+   * first on the RDD to trigger the checkpoint.
*/
   def checkpoint(): Unit = RDDCheckpointData.synchronized {
 // NOTE: we use a global lock here due to complexities downstream with 
ensuring
@@ -1678,6 +1685,13 @@ abstract class RDD[T: ClassTag](
* `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value.
*
* The checkpoint directory set through `SparkContext#setCheckpointDir` is 
not used.
+   *
+   * The data is only checkpointed when `doCheckpoint()` is called, and this 
only happens at the
+   * end of the first action execution on this RDD. The final data that is 
checkpointed after the
+   * first action may be different from the data that was used during the 
action, due to
+   * non-determinism of the underlying operation and retries. If the purpose 
of the checkpoint is
+   * to achieve saving a deterministic snapshot of the data, an eager action 
may need to be called
+   * first on the RDD to trigger the checkpoint.
*/
   def localCheckpoint(): this.type = RDDCheckpointData.synchronized {
 if (Utils.isDynamicAllocationEnabled(conf) &&
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index d792cdbcf865..0038f1a510b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -688,6 +688,14 @@ class Dataset[T] private[sql](
* plan may grow exponentially. It will be saved to files inside the 
checkpoint
* directory set with `SparkContext#setCheckpointDir`.
*
+   * @param eager Whether to checkpoint this dataframe immediately
+   *
+   * @note When checkpoint is used with eager = false, the fi

[jira] [Created] (SPARK-46700) count the last spilling for the shuffle disk spilling bytes metric

2024-01-12 Thread Wenchen Fan (Jira)
Wenchen Fan created SPARK-46700:
---

 Summary: count the last spilling for the shuffle disk spilling 
bytes metric
 Key: SPARK-46700
 URL: https://issues.apache.org/jira/browse/SPARK-46700
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 4.0.0
Reporter: Wenchen Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts

2024-01-12 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 96f34bbeb40c [SPARK-46052][CORE] Remove function 
TaskScheduler.killAllTaskAttempts
96f34bbeb40c is described below

commit 96f34bbeb40ca39aced3596d28dcc768cd995002
Author: Yi Wu 
AuthorDate: Fri Jan 12 17:17:39 2024 +0800

[SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts

### What changes were proposed in this pull request?

This PR removes the interface `TaskScheduler.killAllTaskAttempts` and its 
implementations. And replace it with `TaskScheduler.cancelTasks`. This PR also 
removes "abort stage" from `TaskScheduler.cancelTasks` but move it to after the 
call of `TaskScheduler.cancelTasks` with a control flag 
`spark.legacy.scheduler.stage.abortAfterCancelTasks` (`true` by default to keep 
the same behaviour for now). Because "abort stage" is not necessary while 
canceling tasks, see the comment at https://gi [...]

Besides, this PR fixes a bug which pontentially launching new tasks after 
killing all the tasks in the stage attempt. This PR fixes it by marking it as 
zombie (i.e., `suspend()`) after the killing.

### Why are the changes needed?

Spark has two functions to kill all tasks in a Stage:
* `cancelTasks`: Not only kill all the running tasks in all the stage 
attempts but also abort all the stage attempts
*  `killAllTaskAttempts`: Only kill all the running tasks in all the stage 
attemtps but won't abort the attempts.

However, there's no use case in Spark that a stage would launch new tasks 
after its all tasks get killed. So I think we can replace `killAllTaskAttempts` 
with `cancelTasks` directly.

### Does this PR introduce _any_ user-facing change?
No. `TaskScheduler` is internal.

### How was this patch tested?

Pass existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43954 from Ngone51/remove_killAllTaskAttempts.

Lead-authored-by: Yi Wu 
Co-authored-by: wuyi 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/internal/config/package.scala | 10 +
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 20 +++--
 .../org/apache/spark/scheduler/TaskScheduler.scala |  6 +--
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 44 ---
 .../apache/spark/scheduler/TaskSetManager.scala| 11 +
 .../org/apache/spark/JobCancellationSuite.scala|  2 +-
 .../org/apache/spark/TempLocalSparkContext.scala   |  2 +-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 50 +++---
 .../scheduler/ExternalClusterManagerSuite.scala|  1 -
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 35 +--
 10 files changed, 94 insertions(+), 87 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index bbd79c8b9653..7c8cfc9f208f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2621,6 +2621,16 @@ package object config {
   .toSequence
   .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)
 
+  private[spark] val LEGACY_ABORT_STAGE_AFTER_KILL_TASKS =
+ConfigBuilder("spark.scheduler.stage.legacyAbortAfterKillTasks")
+  .doc("Whether to abort a stage after 
TaskScheduler.killAllTaskAttempts(). This is " +
+"used to restore the original behavior in case there are any 
regressions after " +
+"abort stage is removed")
+  .version("4.0.0")
+  .internal()
+  .booleanConf
+  .createWithDefault(true)
+
   private[spark] val DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION =
 
ConfigBuilder("spark.scheduler.dropTaskInfoAccumulablesOnTaskCompletion.enabled")
   .internal()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 1a51220cdf74..e728d921d290 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,7 +38,7 @@ import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
-import org.apache.spark.internal.config.RDD_CACHE_VISIBILITY_TRACKING_ENABLED
+import org.apache.spark.internal.config.{LEGACY_ABORT_STAGE_AFTER_KILL_TASKS, 
RDD_CACHE_VISIBILITY_TRACKING_ENABLED}
 import org.apache.spark.internal.c

[jira] [Assigned] (SPARK-46052) Remove unnecessary TaskScheduler.killAllTaskAttempts

2024-01-12 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46052:
---

Assignee: wuyi

> Remove unnecessary TaskScheduler.killAllTaskAttempts
> 
>
> Key: SPARK-46052
> URL: https://issues.apache.org/jira/browse/SPARK-46052
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.3, 3.1.3, 3.2.4, 3.3.3, 3.4.1, 3.5.0
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
>  Labels: pull-request-available
>
> Spark has two functions to kill all tasks in a Stage:
> * `cancelTasks`: Not only kill all the running tasks in all the stage 
> attempts but also abort all the stage attempts
> *  `killAllTaskAttempts`: Only kill all the running tasks in all the stage 
> attemtps but won't abort the attempts.
> However, there's no use case in Spark that a stage would launch new tasks 
> after its all tasks get killed. So I think we can replace 
> `killAllTaskAttempts` with `cancelTasks` directly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46052) Remove unnecessary TaskScheduler.killAllTaskAttempts

2024-01-12 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46052.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 43954
[https://github.com/apache/spark/pull/43954]

> Remove unnecessary TaskScheduler.killAllTaskAttempts
> 
>
> Key: SPARK-46052
> URL: https://issues.apache.org/jira/browse/SPARK-46052
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.3, 3.1.3, 3.2.4, 3.3.3, 3.4.1, 3.5.0
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Spark has two functions to kill all tasks in a Stage:
> * `cancelTasks`: Not only kill all the running tasks in all the stage 
> attempts but also abort all the stage attempts
> *  `killAllTaskAttempts`: Only kill all the running tasks in all the stage 
> attemtps but won't abort the attempts.
> However, there's no use case in Spark that a stage would launch new tasks 
> after its all tasks get killed. So I think we can replace 
> `killAllTaskAttempts` with `cancelTasks` directly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46383) Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()`

2024-01-11 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46383:
---

Assignee: Utkarsh Agarwal

> Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()`
> --
>
> Key: SPARK-46383
> URL: https://issues.apache.org/jira/browse/SPARK-46383
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Utkarsh Agarwal
>Assignee: Utkarsh Agarwal
>Priority: Major
>  Labels: pull-request-available
> Attachments: Screenshot 2023-11-06 at 3.56.26 PM.png, screenshot-1.png
>
>
> `AccumulableInfo` is one of the top heap consumers in driver's heap dumps for 
> stages with many tasks. For a stage with a large number of tasks 
> ({_}O(100k){_}), we saw {*}{{*}}30%{{*}}{*} of the heap usage stemming from 
> `TaskInfo.accumulables()`.
> !screenshot-1.png|width=641,height=98!  
> The `TaskSetManager` today keeps around the TaskInfo objects 
> ([ref1|https://github.com/apache/spark/blob/c1ba963e64a22dea28e17b1ed954e6d03d38da1e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L134],
>  
> [ref2|https://github.com/apache/spark/blob/c1ba963e64a22dea28e17b1ed954e6d03d38da1e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L192]))
>  and in turn the task metrics (`AccumulableInfo`) for every task attempt 
> until the stage is completed. This means that for stages with a large number 
> of tasks, we keep metrics for all the tasks (`AccumulableInfo`) around even 
> when the task has completed and its metrics have been aggregated. Given a 
> task has a large number of metrics, stages with many tasks end up with a 
> large heap usage in the form of task metrics.
> Ideally, we should clear up a task's TaskInfo upon the task's completion, 
> thereby reducing the driver's heap usage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46383) Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()`

2024-01-11 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46383.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44321
[https://github.com/apache/spark/pull/44321]

> Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()`
> --
>
> Key: SPARK-46383
> URL: https://issues.apache.org/jira/browse/SPARK-46383
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Utkarsh Agarwal
>Assignee: Utkarsh Agarwal
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
> Attachments: Screenshot 2023-11-06 at 3.56.26 PM.png, screenshot-1.png
>
>
> `AccumulableInfo` is one of the top heap consumers in driver's heap dumps for 
> stages with many tasks. For a stage with a large number of tasks 
> ({_}O(100k){_}), we saw {*}{{*}}30%{{*}}{*} of the heap usage stemming from 
> `TaskInfo.accumulables()`.
> !screenshot-1.png|width=641,height=98!  
> The `TaskSetManager` today keeps around the TaskInfo objects 
> ([ref1|https://github.com/apache/spark/blob/c1ba963e64a22dea28e17b1ed954e6d03d38da1e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L134],
>  
> [ref2|https://github.com/apache/spark/blob/c1ba963e64a22dea28e17b1ed954e6d03d38da1e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L192]))
>  and in turn the task metrics (`AccumulableInfo`) for every task attempt 
> until the stage is completed. This means that for stages with a large number 
> of tasks, we keep metrics for all the tasks (`AccumulableInfo`) around even 
> when the task has completed and its metrics have been aggregated. Given a 
> task has a large number of metrics, stages with many tasks end up with a 
> large heap usage in the form of task metrics.
> Ideally, we should clear up a task's TaskInfo upon the task's completion, 
> thereby reducing the driver's heap usage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch master updated: [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()`

2024-01-11 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 28da1d853477 [SPARK-46383] Reduce Driver Heap Usage by Reducing the 
Lifespan of `TaskInfo.accumulables()`
28da1d853477 is described below

commit 28da1d853477b306774798d8aa738901221fb804
Author: Utkarsh 
AuthorDate: Fri Jan 12 10:28:22 2024 +0800

[SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of 
`TaskInfo.accumulables()`

### What changes were proposed in this pull request?
`AccumulableInfo` is one of the top heap consumers in driver's heap dumps 
for stages with many tasks. For a stage with a large number of tasks 
(**_O(100k)_**), we saw **30%** of the heap usage stemming from 
`TaskInfo.accumulables()`.


![image](https://github.com/apache/spark/assets/10495099/13ef5d07-abfc-47fd-81b6-705f599db011)

The `TaskSetManager` today keeps around the TaskInfo objects 
([ref1](https://github.com/apache/spark/blob/c1ba963e64a22dea28e17b1ed954e6d03d38da1e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L134),
 
[ref2](https://github.com/apache/spark/blob/c1ba963e64a22dea28e17b1ed954e6d03d38da1e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L192)))
 and in turn the task metrics (`AccumulableInfo`) for every task attempt until 
the stage is completed. This [...]

This PR is an opt-in change (disabled by default) to reduce the driver's 
heap usage for stages with many tasks by no longer referencing the task metrics 
of completed tasks. Once a task is completed in `TaskSetManager`, we no longer 
keep its metrics around. Upon task completion, we clone the `TaskInfo` object 
and empty out the metrics for the clone. The cloned `TaskInfo` is retained by 
the `TaskSetManager` while the original `TaskInfo` object with the metrics is 
sent over to the `DAGSc [...]

### Config to gate changes
The changes in the PR are guarded with the Spark conf 
`spark.scheduler.dropTaskInfoAccumulablesOnTaskCompletion.enabled` which can be 
used for rollback or staged rollouts.

### Why are the changes disabled by default?
The PR introduces a breaking change wherein the `TaskInfo.accumulables()` 
are empty for `Resubmitted` tasks upon the loss of an executor. Read 
https://github.com/apache/spark/pull/44321#pullrequestreview-1785137821 for 
details.

### Why are the changes needed?

Reduce driver's heap usage, especially for stages with many tasks

## Benchmarking
On a cluster running a scan stage with 100k tasks, the TaskSetManager's 
heap usage dropped from 1.1 GB to 37 MB. This  **reduced the total driver's 
heap usage by 38%**, down to 2 GB from 3.5 GB.

**BEFORE**


![image](https://github.com/databricks/runtime/assets/10495099/7c1599f3-3587-48a1-b019-84115b1bb90d)
**WITH FIX**
https://github.com/databricks/runtime/assets/10495099/b85129c8-dc10-4ee2-898d-61c8e7449616;>

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added new tests and did benchmarking on a cluster.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Github Copilot

Closes #44321 from utkarsh39/SPARK-46383.

Authored-by: Utkarsh 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/internal/config/package.scala | 10 +++
 .../org/apache/spark/scheduler/TaskInfo.scala  | 10 ++-
 .../apache/spark/scheduler/TaskSetManager.scala| 71 +++---
 .../spark/scheduler/SparkListenerSuite.scala   | 35 +++
 .../spark/scheduler/TaskSetManagerSuite.scala  | 51 
 5 files changed, 169 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 2823b7cdb602..bbd79c8b9653 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2620,4 +2620,14 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)
+
+  private[spark] val DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION =
+
ConfigBuilder("spark.scheduler.dropTaskInfoAccumulablesOnTaskCompletion.enabled")
+  .internal()
+  .doc("If true, the task info accumulables will be cleared upon task 
completion in " +
+"TaskSetManager. This reduces the heap usage of the driver by only 
referencing the " +
+"task info accumulables for the active tasks and not for completed 
tasks.")
+  .version("4.0.0")
+  .booleanConf
+  .c

[jira] [Resolved] (SPARK-46640) RemoveRedundantAliases does not account for SubqueryExpression when removing aliases

2024-01-11 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46640.
-
Fix Version/s: 3.5.1
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 44645
[https://github.com/apache/spark/pull/44645]

> RemoveRedundantAliases does not account for SubqueryExpression when removing 
> aliases
> 
>
> Key: SPARK-46640
> URL: https://issues.apache.org/jira/browse/SPARK-46640
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 4.0.0
>Reporter: Nikhil Sheoran
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 3.5.1, 4.0.0
>
>
> `RemoveRedundantAliases{{{}`{}}} does not take into account the outer 
> attributes of a `SubqueryExpression` aliases, potentially removing them if it 
> thinks they are redundant.
> This can cause scenarios where a subquery expression has conditions like `a#x 
> = a#x` i.e. both the attribute names and the expression ID(s) are the same. 
> This can then lead to conflicting expression ID(s) error.
> In `RemoveRedundantAliases`, we have an excluded AttributeSet argument 
> denoting the references for which we should not remove aliases. For a query 
> with a subquery expression, adding the references of this subquery in the 
> excluded set prevents such rewrite from happening.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46640) RemoveRedundantAliases does not account for SubqueryExpression when removing aliases

2024-01-11 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46640:
---

Assignee: Nikhil Sheoran

> RemoveRedundantAliases does not account for SubqueryExpression when removing 
> aliases
> 
>
> Key: SPARK-46640
> URL: https://issues.apache.org/jira/browse/SPARK-46640
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 4.0.0
>Reporter: Nikhil Sheoran
>Assignee: Nikhil Sheoran
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.1
>
>
> `RemoveRedundantAliases{{{}`{}}} does not take into account the outer 
> attributes of a `SubqueryExpression` aliases, potentially removing them if it 
> thinks they are redundant.
> This can cause scenarios where a subquery expression has conditions like `a#x 
> = a#x` i.e. both the attribute names and the expression ID(s) are the same. 
> This can then lead to conflicting expression ID(s) error.
> In `RemoveRedundantAliases`, we have an excluded AttributeSet argument 
> denoting the references for which we should not remove aliases. For a query 
> with a subquery expression, adding the references of this subquery in the 
> excluded set prevents such rewrite from happening.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-46640][SQL] Fix RemoveRedundantAlias by excluding subquery attributes

2024-01-11 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 8a0f64274f44 [SPARK-46640][SQL] Fix RemoveRedundantAlias by excluding 
subquery attributes
8a0f64274f44 is described below

commit 8a0f64274f44dd17a3e1f034c9f1f20a61ff0549
Author: Nikhil Sheoran <125331115+nikhilsheoran...@users.noreply.github.com>
AuthorDate: Fri Jan 12 10:20:49 2024 +0800

[SPARK-46640][SQL] Fix RemoveRedundantAlias by excluding subquery attributes

- In `RemoveRedundantAliases`, we have an `excluded` AttributeSet argument 
denoting the references for which we should not remove aliases. For a query 
with subquery expressions, adding the attributes references by the subquery in 
the `excluded` set prevents rewrites that might remove presumedly redundant 
aliases. (Changes in RemoveRedundantAlias)
- Added a configuration flag to disable this fix, if not needed.
- Added a unit test with Filter exists subquery expression to show how the 
alias would have been removed.

- `RemoveRedundantAliases` does not take into account the outer attributes 
of a `SubqueryExpression` when considering redundant aliases, potentially 
removing them if it thinks they are redundant.
- This can cause scenarios where a subquery expression has conditions like 
`a#x = a#x` i.e. both the attribute names and the expression ID(s) are the 
same. This can then lead to conflicting expression ID(s) error.
- For example, in the query example below, the `RemoveRedundantAliases` 
would remove the alias `a#0 as a#1` and replace `a#1` with `a#0` in the Filter 
exists subquery expression which would create an issue if the subquery 
expression had an attribute with reference `a#0` (possible due to different 
scan relation instances possibly having the same attribute ID(s) (Ref: #40662)
```
Filter exists [a#1 && (a#1 = b#2)]
:  +- LocalRelation , [b#2]
  +- Project [a#0 AS a#1]
  +- LocalRelation , [a#0]
```
becomes
```
Filter exists [a#0 && (a#0 = b#2)]
:  +- LocalRelation , [b#2]
  +- LocalRelation , [a#0]
```
- The changes are needed to fix this bug.

No

- Added a unit test with Filter exists subquery expression to show how the 
alias would have been removed.

No

Closes #44645 from nikhilsheoran-db/SPARK-46640.

Authored-by: Nikhil Sheoran 
<125331115+nikhilsheoran...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
(cherry picked from commit bbeb8d7417bafa09ad5202347175a47b3217e27f)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 12 +-
 .../org/apache/spark/sql/internal/SQLConf.scala|  9 
 .../RemoveRedundantAliasAndProjectSuite.scala  | 48 ++
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ec5f00d34cd8..df17840d567e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -576,10 +576,20 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
 }
 
   case _ =>
+val subQueryAttributes = if (conf.getConf(SQLConf
+  .EXCLUDE_SUBQUERY_EXP_REFS_FROM_REMOVE_REDUNDANT_ALIASES)) {
+  // Collect the references for all the subquery expressions in the 
plan.
+  AttributeSet.fromAttributeSets(plan.expressions.collect {
+case e: SubqueryExpression => e.references
+  })
+} else {
+  AttributeSet.empty
+}
+
 // Remove redundant aliases in the subtree(s).
 val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)]
 val newNode = plan.mapChildren { child =>
-  val newChild = removeRedundantAliases(child, excluded)
+  val newChild = removeRedundantAliases(child, excluded ++ 
subQueryAttributes)
   currentNextAttrPairs ++= createAttributeMapping(child, newChild)
   newChild
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index de4a89667aff..2e41374035c8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4352,6 +4352,15 @@ object SQLConf {
   .checkValue(_ >= 0, "The threshold of cached local relations must not be 
negative")
   .createWithDefault(64 * 1024 * 1024)
 
+  val EXCLUDE_SUBQUERY_EXP_RE

(spark) branch master updated: [SPARK-46640][SQL] Fix RemoveRedundantAlias by excluding subquery attributes

2024-01-11 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bbeb8d7417ba [SPARK-46640][SQL] Fix RemoveRedundantAlias by excluding 
subquery attributes
bbeb8d7417ba is described below

commit bbeb8d7417bafa09ad5202347175a47b3217e27f
Author: Nikhil Sheoran <125331115+nikhilsheoran...@users.noreply.github.com>
AuthorDate: Fri Jan 12 10:20:49 2024 +0800

[SPARK-46640][SQL] Fix RemoveRedundantAlias by excluding subquery attributes

### What changes were proposed in this pull request?
- In `RemoveRedundantAliases`, we have an `excluded` AttributeSet argument 
denoting the references for which we should not remove aliases. For a query 
with subquery expressions, adding the attributes references by the subquery in 
the `excluded` set prevents rewrites that might remove presumedly redundant 
aliases. (Changes in RemoveRedundantAlias)
- Added a configuration flag to disable this fix, if not needed.
- Added a unit test with Filter exists subquery expression to show how the 
alias would have been removed.

### Why are the changes needed?
- `RemoveRedundantAliases` does not take into account the outer attributes 
of a `SubqueryExpression` when considering redundant aliases, potentially 
removing them if it thinks they are redundant.
- This can cause scenarios where a subquery expression has conditions like 
`a#x = a#x` i.e. both the attribute names and the expression ID(s) are the 
same. This can then lead to conflicting expression ID(s) error.
- For example, in the query example below, the `RemoveRedundantAliases` 
would remove the alias `a#0 as a#1` and replace `a#1` with `a#0` in the Filter 
exists subquery expression which would create an issue if the subquery 
expression had an attribute with reference `a#0` (possible due to different 
scan relation instances possibly having the same attribute ID(s) (Ref: #40662)
```
Filter exists [a#1 && (a#1 = b#2)]
:  +- LocalRelation , [b#2]
  +- Project [a#0 AS a#1]
  +- LocalRelation , [a#0]
```
becomes
```
Filter exists [a#0 && (a#0 = b#2)]
:  +- LocalRelation , [b#2]
  +- LocalRelation , [a#0]
```
- The changes are needed to fix this bug.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Added a unit test with Filter exists subquery expression to show how the 
alias would have been removed.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44645 from nikhilsheoran-db/SPARK-46640.

Authored-by: Nikhil Sheoran 
<125331115+nikhilsheoran...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 12 +-
 .../org/apache/spark/sql/internal/SQLConf.scala|  9 
 .../RemoveRedundantAliasAndProjectSuite.scala  | 48 ++
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 61791b35df85..8fcc7c7c26b4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -584,10 +584,20 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
 }
 
   case _ =>
+val subQueryAttributes = if (conf.getConf(SQLConf
+  .EXCLUDE_SUBQUERY_EXP_REFS_FROM_REMOVE_REDUNDANT_ALIASES)) {
+  // Collect the references for all the subquery expressions in the 
plan.
+  AttributeSet.fromAttributeSets(plan.expressions.collect {
+case e: SubqueryExpression => e.references
+  })
+} else {
+  AttributeSet.empty
+}
+
 // Remove redundant aliases in the subtree(s).
 val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)]
 val newNode = plan.mapChildren { child =>
-  val newChild = removeRedundantAliases(child, excluded)
+  val newChild = removeRedundantAliases(child, excluded ++ 
subQueryAttributes)
   currentNextAttrPairs ++= createAttributeMapping(child, newChild)
   newChild
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1928e74363cb..743a2e20c885 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4513,6 +4513,15 @@ object SQLConf {
   .booleanConf
   

Re: [VOTE] SPIP: Structured Streaming - Arbitrary State API v2

2024-01-10 Thread Wenchen Fan
+1

On Thu, Jan 11, 2024 at 9:32 AM L. C. Hsieh  wrote:

> +1
>
> On Wed, Jan 10, 2024 at 9:06 AM Bhuwan Sahni
>  wrote:
>
>> +1. This is a good addition.
>>
>> 
>> *Bhuwan Sahni*
>> Staff Software Engineer
>>
>> bhuwan.sa...@databricks.com
>> 500 108th Ave. NE
>> Bellevue, WA 98004
>> USA
>>
>>
>> On Wed, Jan 10, 2024 at 9:00 AM Burak Yavuz  wrote:
>>
>>> +1. Excited to see more stateful workloads with Structured Streaming!
>>>
>>>
>>> Best,
>>> Burak
>>>
>>> On Wed, Jan 10, 2024 at 8:21 AM Praveen Gattu
>>>  wrote:
>>>
 +1. This brings Structured Streaming a good solution for
 customers wanting to build stateful stream processing applications.

 On Wed, Jan 10, 2024 at 7:30 AM Bartosz Konieczny <
 bartkoniec...@gmail.com> wrote:

> +1 :)
>
> On Wed, Jan 10, 2024 at 9:57 AM Shixiong Zhu 
> wrote:
>
>> +1 (binding)
>>
>> Best Regards,
>> Shixiong Zhu
>>
>>
>> On Tue, Jan 9, 2024 at 6:47 PM 刘唯  wrote:
>>
>>> This is a good addition! +1
>>>
>>> Raghu Angadi  于2024年1月9日周二
>>> 13:17写道:
>>>
 +1. This is a major improvement to the state API.

 Raghu.

 On Tue, Jan 9, 2024 at 1:42 AM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> +1 for me as well
>
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility
> for any loss, damage or destruction of data or any other property 
> which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary 
> damages
> arising from such loss, damage or destruction.
>
>
>
>
> On Tue, 9 Jan 2024 at 03:24, Anish Shrigondekar
>  wrote:
>
>> Thanks Jungtaek for creating the Vote thread.
>>
>> +1 (non-binding) from my side too.
>>
>> Thanks,
>> Anish
>>
>> On Tue, Jan 9, 2024 at 6:09 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Starting with my +1 (non-binding). Thanks!
>>>
>>> On Tue, Jan 9, 2024 at 9:37 AM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
 Hi all,

 I'd like to start the vote for SPIP: Structured Streaming -
 Arbitrary State API v2.

 References:

- JIRA ticket

- SPIP doc

 
- Discussion thread

 

 Please vote on the SPIP for the next 72 hours:

 [ ] +1: Accept the proposal as an official SPIP
 [ ] +0
 [ ] -1: I don’t think this is a good idea because …

 Thanks!
 Jungtaek Lim (HeartSaVioR)

>>>
>
> --
> Bartosz Konieczny
> freelance data engineer
> https://www.waitingforcode.com
> https://github.com/bartosz25/
> https://twitter.com/waitingforcode
>
>


(spark) branch master updated: [SPARK-46442][SQL] DS V2 supports push down PERCENTILE_CONT and PERCENTILE_DISC

2024-01-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 85b504d64701 [SPARK-46442][SQL] DS V2 supports push down 
PERCENTILE_CONT and PERCENTILE_DISC
85b504d64701 is described below

commit 85b504d64701ca470b946841ca5b2b4e129293c1
Author: Jiaan Geng 
AuthorDate: Wed Jan 10 12:24:24 2024 +0800

[SPARK-46442][SQL] DS V2 supports push down PERCENTILE_CONT and 
PERCENTILE_DISC

### What changes were proposed in this pull request?
This PR will translate the aggregate function `PERCENTILE_CONT` and 
`PERCENTILE_DISC` for pushdown.

- This PR adds `Expression[] orderingWithinGroups` into 
`GeneralAggregateFunc`, so as DS V2 pushdown framework could compile the 
`WITHIN GROUP (ORDER BY ...)` easily.

- This PR also split `visitInverseDistributionFunction` from 
`visitAggregateFunction`, so as DS V2 pushdown framework could generate the 
syntax `WITHIN GROUP (ORDER BY ...)` easily.

- This PR also fix a bug that `JdbcUtils` can't treat the precision and 
scale of decimal returned from JDBC.

### Why are the changes needed?
DS V2 supports push down `PERCENTILE_CONT` and `PERCENTILE_DISC`.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #44397 from beliefer/SPARK-46442.

Lead-authored-by: Jiaan Geng 
Co-authored-by: beliefer 
Signed-off-by: Wenchen Fan 
---
 .../aggregate/GeneralAggregateFunc.java| 21 +++-
 .../sql/connector/util/V2ExpressionSQLBuilder.java | 21 +++-
 .../sql/catalyst/util/V2ExpressionBuilder.scala| 20 +--
 .../org/apache/spark/sql/jdbc/H2Dialect.scala  | 15 +-
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   | 17 +-
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 62 --
 6 files changed, 132 insertions(+), 24 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/GeneralAggregateFunc.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/GeneralAggregateFunc.java
index 4d787eaf9644..d287288ba33f 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/GeneralAggregateFunc.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/GeneralAggregateFunc.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.connector.expressions.Expression;
+import org.apache.spark.sql.connector.expressions.SortValue;
 import org.apache.spark.sql.internal.connector.ExpressionWithToString;
 
 /**
@@ -41,7 +42,9 @@ import 
org.apache.spark.sql.internal.connector.ExpressionWithToString;
  *  REGR_R2(input1, input2) Since 3.4.0
  *  REGR_SLOPE(input1, input2) Since 3.4.0
  *  REGR_SXY(input1, input2) Since 3.4.0
- *  MODE(input1[, inverse]) Since 4.0.0
+ *  MODE() WITHIN (ORDER BY input1 [ASC|DESC]) Since 4.0.0
+ *  PERCENTILE_CONT(input1) WITHIN (ORDER BY input2 [ASC|DESC]) 
Since 4.0.0
+ *  PERCENTILE_DISC(input1) WITHIN (ORDER BY input2 [ASC|DESC]) 
Since 4.0.0
  * 
  *
  * @since 3.3.0
@@ -51,11 +54,21 @@ public final class GeneralAggregateFunc extends 
ExpressionWithToString implement
   private final String name;
   private final boolean isDistinct;
   private final Expression[] children;
+  private final SortValue[] orderingWithinGroups;
 
   public GeneralAggregateFunc(String name, boolean isDistinct, Expression[] 
children) {
 this.name = name;
 this.isDistinct = isDistinct;
 this.children = children;
+this.orderingWithinGroups = new SortValue[]{};
+  }
+
+  public GeneralAggregateFunc(
+  String name, boolean isDistinct, Expression[] children, SortValue[] 
orderingWithinGroups) {
+this.name = name;
+this.isDistinct = isDistinct;
+this.children = children;
+this.orderingWithinGroups = orderingWithinGroups;
   }
 
   public String name() { return name; }
@@ -64,6 +77,8 @@ public final class GeneralAggregateFunc extends 
ExpressionWithToString implement
   @Override
   public Expression[] children() { return children; }
 
+  public SortValue[] orderingWithinGroups() { return orderingWithinGroups; }
+
   @Override
   public boolean equals(Object o) {
 if (this == o) return true;
@@ -73,7 +88,8 @@ public final class GeneralAggregateFunc extends 
ExpressionWithToString implement
 
 if (isDistinct != that.isDistinct) return false;
 if (!name.equals(that.name)) return false;
-return Arrays.equals(children, that.children);
+if (!Arrays.equals(children, that.children)) return false;
+return Arrays.equals

[jira] [Created] (SPARK-46634) literal validation should not drill down to null fields

2024-01-09 Thread Wenchen Fan (Jira)
Wenchen Fan created SPARK-46634:
---

 Summary: literal validation should not drill down to null fields
 Key: SPARK-46634
 URL: https://issues.apache.org/jira/browse/SPARK-46634
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 4.0.0
Reporter: Wenchen Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46331) Removing CodeGenFallback trait from subset of datetime and spark version functions

2024-01-09 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-46331:
---

Assignee: Aleksandar Tomic

> Removing CodeGenFallback trait from subset of datetime and spark version 
> functions
> --
>
> Key: SPARK-46331
> URL: https://issues.apache.org/jira/browse/SPARK-46331
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Aleksandar Tomic
>Assignee: Aleksandar Tomic
>Priority: Major
>  Labels: pull-request-available
>
> This change moves us further into direction of removing CodegenFallback and 
> instead using RuntimeReplacable with StaticInvoke which will directly insert 
> provided code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46331) Removing CodeGenFallback trait from subset of datetime and spark version functions

2024-01-09 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-46331.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44261
[https://github.com/apache/spark/pull/44261]

> Removing CodeGenFallback trait from subset of datetime and spark version 
> functions
> --
>
> Key: SPARK-46331
> URL: https://issues.apache.org/jira/browse/SPARK-46331
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Aleksandar Tomic
>Assignee: Aleksandar Tomic
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> This change moves us further into direction of removing CodegenFallback and 
> instead using RuntimeReplacable with StaticInvoke which will directly insert 
> provided code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



<    2   3   4   5   6   7   8   9   10   11   >