maropu commented on a change in pull request #26441: [SPARK-29682][SQL] Resolve
conflicting references in aggregate expressions
URL: https://github.com/apache/spark/pull/26441#discussion_r344483115
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -949,14 +949,19 @@ class Analyzer(
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty
=>
(oldVersion, oldVersion.copy(serializer =
oldVersion.serializer.map(_.newInstance())))
- // Handle projects that create conflicting aliases.
case oldVersion @ Project(projectList, _)
- if
findAliases(projectList).intersect(conflictingAttributes).nonEmpty =>
Review comment:
It seems the current dedup logic implicitly assumes conflict attrs happen in
aliases?;
```
// Regular group-by cases
scala> val numsDF = Seq(1, 2, 3, 4, 5, 6).toDF("nums")
scala> val groupByDF = numsDF.groupBy("nums").agg(max(lit(0)).as("agcol"))
scala> groupByDF.join(groupByDF, "nums").explain(true)
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List(nums))
:- Aggregate [nums#79], [nums#79, max(0) AS agcol#83]
: +- Project [value#76 AS nums#79]
: +- LocalRelation [value#76]
+- Aggregate [nums#112], [nums#112, max(0) AS agcol#110]
+- Project [value#76 AS nums#112]
+- LocalRelation [value#76]
== Analyzed Logical Plan ==
nums: int, agcol: int, agcol: int
Project [nums#79, agcol#83, agcol#110]
+- Join Inner, (nums#79 = nums#112)
:- Aggregate [nums#79], [nums#79, max(0) AS agcol#83]
: +- Project [value#76 AS nums#79]
: +- LocalRelation [value#76]
+- Aggregate [nums#112], [nums#112, max(0) AS agcol#110]
+- Project [value#76 AS nums#112]
+- LocalRelation [value#76]
== Optimized Logical Plan ==
...
scala> groupByDF.join(groupByDF, "nums").show
+----+-----+-----+
|nums|agcol|agcol|
+----+-----+-----+
| 1| 0| 0|
| 6| 0| 0|
| 3| 0| 0|
| 5| 0| 0|
| 4| 0| 0|
| 2| 0| 0|
+----+-----+-----+
// Grouping analytics cases
scala> val cubeDF = numsDF.cube("nums").agg(max(lit(0)).as("agcol"))
scala> cubeDF.join(cubeDF, "nums").show
org.apache.spark.sql.AnalysisException:
Failure when resolving conflicting references in Join:
'Join Inner
:- Aggregate [nums#121, spark_grouping_id#119], [nums#121, max(0) AS
agcol#118]
: +- Expand [List(nums#79, nums#120, 0), List(nums#79, null, 1)], [nums#79,
nums#121, spark_grouping_id#119]
: +- Project [nums#79, nums#79 AS nums#120]
: +- Project [value#76 AS nums#79]
: +- LocalRelation [value#76]
+- Aggregate [nums#121, spark_grouping_id#119], [nums#121, max(0) AS
agcol#124]
+- Expand [List(nums#79, nums#120, 0), List(nums#79, null, 1)], [nums#79,
nums#121, spark_grouping_id#119]
+- Project [nums#79, nums#79 AS nums#120]
+- Project [value#76 AS nums#79]
+- LocalRelation [value#76]
Conflicting attributes: nums#121
;;
'Join Inner
:- Aggregate [nums#121, spark_grouping_id#119], [nums#121, max(0) AS
agcol#118]
: +- Expand [List(nums#79, nums#120, 0), List(nums#79, null, 1)], [nums#79,
nums#121, spark_grouping_id#119]
: +- Project [nums#79, nums#79 AS nums#120]
: +- Project [value#76 AS nums#79]
: +- LocalRelation [value#76]
+- Aggregate [nums#121, spark_grouping_id#119], [nums#121, max(0) AS
agcol#124]
+- Expand [List(nums#79, nums#120, 0), List(nums#79, null, 1)], [nums#79,
nums#121, spark_grouping_id#119]
+- Project [nums#79, nums#79 AS nums#120]
+- Project [value#76 AS nums#79]
+- LocalRelation [value#76]
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:47)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:46)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:122)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:457)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:90)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:154)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:90)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:87)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:122)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:148)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:145)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:66)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:63)
at
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:63)
at org.apache.spark.sql.Dataset.join(Dataset.scala:989)
at org.apache.spark.sql.Dataset.join(Dataset.scala:960)
at org.apache.spark.sql.Dataset.join(Dataset.scala:935)
... 47 elided
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]