cloud-fan commented on code in PR #40662:
URL: https://github.com/apache/spark/pull/40662#discussion_r1159770234
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala:
##########
@@ -1438,4 +1438,66 @@ class AnalysisSuite extends AnalysisTest with Matchers {
).analyze
)
}
+
+ test("SPARK-43030: deduplicate relations in CTE relation definitions") {
+ val join = testRelation.as("left").join(testRelation.as("right"))
+ val cteDef = CTERelationDef(join)
+ val cteRef = CTERelationRef(cteDef.id, false, Nil)
+
+ withClue("flat CTE") {
+ val plan = WithCTE(cteRef.select($"left.a"), Seq(cteDef)).analyze
+ val relations = plan.collect {
+ case r: LocalRelation => r
+ }
+ assert(relations.length == 2)
+ assert(relations.map(_.output).distinct.length == 2)
+ }
+
+ withClue("nested CTE") {
+ val cteDef2 = CTERelationDef(WithCTE(cteRef.join(testRelation),
Seq(cteDef)))
+ val cteRef2 = CTERelationRef(cteDef2.id, false, Nil)
+ val plan = WithCTE(cteRef2, Seq(cteDef2)).analyze
+ val relations = plan.collect {
+ case r: LocalRelation => r
+ }
+ assert(relations.length == 3)
+ assert(relations.map(_.output).distinct.length == 3)
+ }
+ }
+
+ test("SPARK-43030: deduplicate CTE relation references") {
+ val cteDef = CTERelationDef(testRelation.select($"a"))
+ val cteRef = CTERelationRef(cteDef.id, false, Nil)
+
+ withClue("single reference") {
+ val plan = WithCTE(cteRef.where($"a" > 1), Seq(cteDef)).analyze
+ val refs = plan.collect {
+ case r: CTERelationRef => r
+ }
+ // Only one CTE ref, no need to deduplicate
+ assert(refs.length == 1)
+ assert(refs(0).output == testRelation.output.take(1))
+ }
+
+ withClue("two references") {
+ val plan = WithCTE(cteRef.join(cteRef), Seq(cteDef)).analyze
+ val refs = plan.collect {
+ case r: CTERelationRef => r
+ }
+ assert(refs.length == 2)
+ assert(refs.map(_.output).distinct.length == 2)
+ }
+
+ withClue("references in both CTE relation definition and main query") {
+ val cteDef2 = CTERelationDef(cteRef.where($"a" > 2))
+ val cteRef2 = CTERelationRef(cteDef2.id, false, Nil)
+ val plan = WithCTE(cteRef.union(cteRef2), Seq(cteDef, cteDef2)).analyze
+ val refs = plan.collect {
+ case r: CTERelationRef => r
+ }
+ assert(refs.length == 3)
+ assert(refs.map(_.cteId).distinct.length == 2)
+ assert(refs.map(_.output).distinct.length == 3)
Review Comment:
It's kind of unknow what can go wrong if we don't deduplicate leaf
relations. The original motivation is to fix rule `CostBasedJoinReorder ` :
https://github.com/apache/spark/commit/f05b940749e87d17da2e6141f7cc15170c054822
, but other rules may hit similar issues.
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala:
##########
@@ -1438,4 +1438,66 @@ class AnalysisSuite extends AnalysisTest with Matchers {
).analyze
)
}
+
+ test("SPARK-43030: deduplicate relations in CTE relation definitions") {
+ val join = testRelation.as("left").join(testRelation.as("right"))
+ val cteDef = CTERelationDef(join)
+ val cteRef = CTERelationRef(cteDef.id, false, Nil)
+
+ withClue("flat CTE") {
+ val plan = WithCTE(cteRef.select($"left.a"), Seq(cteDef)).analyze
+ val relations = plan.collect {
+ case r: LocalRelation => r
+ }
+ assert(relations.length == 2)
+ assert(relations.map(_.output).distinct.length == 2)
+ }
+
+ withClue("nested CTE") {
+ val cteDef2 = CTERelationDef(WithCTE(cteRef.join(testRelation),
Seq(cteDef)))
+ val cteRef2 = CTERelationRef(cteDef2.id, false, Nil)
+ val plan = WithCTE(cteRef2, Seq(cteDef2)).analyze
+ val relations = plan.collect {
+ case r: LocalRelation => r
+ }
+ assert(relations.length == 3)
+ assert(relations.map(_.output).distinct.length == 3)
+ }
+ }
+
+ test("SPARK-43030: deduplicate CTE relation references") {
+ val cteDef = CTERelationDef(testRelation.select($"a"))
+ val cteRef = CTERelationRef(cteDef.id, false, Nil)
+
+ withClue("single reference") {
+ val plan = WithCTE(cteRef.where($"a" > 1), Seq(cteDef)).analyze
+ val refs = plan.collect {
+ case r: CTERelationRef => r
+ }
+ // Only one CTE ref, no need to deduplicate
+ assert(refs.length == 1)
+ assert(refs(0).output == testRelation.output.take(1))
+ }
+
+ withClue("two references") {
+ val plan = WithCTE(cteRef.join(cteRef), Seq(cteDef)).analyze
+ val refs = plan.collect {
+ case r: CTERelationRef => r
+ }
+ assert(refs.length == 2)
+ assert(refs.map(_.output).distinct.length == 2)
+ }
+
+ withClue("references in both CTE relation definition and main query") {
+ val cteDef2 = CTERelationDef(cteRef.where($"a" > 2))
+ val cteRef2 = CTERelationRef(cteDef2.id, false, Nil)
+ val plan = WithCTE(cteRef.union(cteRef2), Seq(cteDef, cteDef2)).analyze
+ val refs = plan.collect {
+ case r: CTERelationRef => r
+ }
+ assert(refs.length == 3)
+ assert(refs.map(_.cteId).distinct.length == 2)
+ assert(refs.map(_.output).distinct.length == 3)
Review Comment:
It's kind of unknown what can go wrong if we don't deduplicate leaf
relations. The original motivation is to fix rule `CostBasedJoinReorder ` :
https://github.com/apache/spark/commit/f05b940749e87d17da2e6141f7cc15170c054822
, but other rules may hit similar issues.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]