agubichev commented on code in PR #41301:
URL: https://github.com/apache/spark/pull/41301#discussion_r1267038014


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala:
##########
@@ -454,4 +460,85 @@ class DecorrelateInnerQuerySuite extends PlanTest {
             DomainJoin(Seq(x), testRelation))))
     check(innerPlan, outerPlan, correctAnswer, Seq(x <=> x))
   }
+
+  test("SPARK-43780: aggregation in subquery with correlated equi-join") {
+    // Join in the subquery is on equi-predicates, so all the correlated 
references can be
+    // substituted by equivalent ones from the outer query, and domain join is 
not needed.
+    val outerPlan = testRelation
+    val innerPlan =
+      Aggregate(
+        Seq.empty[Expression], Seq(Alias(count(Literal(1)), "a")()),
+        Project(Seq(x, y, a3, b3),
+          Join(testRelation2, testRelation3, Inner,
+            Some(And(x === a3, y === OuterReference(a))), JoinHint.NONE)))
+
+    val correctAnswer =
+      Aggregate(
+        Seq(y), Seq(Alias(count(Literal(1)), "a")(), y),
+        Project(Seq(x, y, a3, b3),
+          Join(testRelation2, testRelation3, Inner, Some(x === a3), 
JoinHint.NONE)))
+    check(innerPlan, outerPlan, correctAnswer, Seq(y === a))
+  }
+
+  test("SPARK-43780: aggregation in subquery with correlated non-equi-join") {
+    // Join in the subquery is on non-equi-predicate, so we introduce a 
DomainJoin.
+    val outerPlan = testRelation
+    val innerPlan =
+      Aggregate(
+        Seq.empty[Expression], Seq(Alias(count(Literal(1)), "a")()),
+        Project(Seq(x, y, a3, b3),
+          Join(testRelation2, testRelation3, Inner,
+            Some(And(x === a3, y > OuterReference(a))), JoinHint.NONE)))
+    val correctAnswer =
+      Aggregate(
+        Seq(a), Seq(Alias(count(Literal(1)), "a")(), a),
+        Project(Seq(x, y, a3, b3, a),
+          Join(
+            DomainJoin(Seq(a), testRelation2),
+            testRelation3, Inner, Some(And(x === a3, y > a)), JoinHint.NONE)))
+    check(innerPlan, outerPlan, correctAnswer, Seq(a <=> a))
+  }
+
+  test("SPARK-43780: aggregation in subquery with correlated left join") {
+    // Join in the subquery is on equi-predicates, so all the correlated 
references can be
+    // substituted by equivalent ones from the outer query, and domain join is 
not needed.
+    val outerPlan = testRelation
+    val innerPlan =
+      Aggregate(
+        Seq.empty[Expression], Seq(Alias(count(Literal(1)), "a")()),
+        Project(Seq(x, y, a3, b3),
+          Join(testRelation2, testRelation3, LeftOuter,
+            Some(And(x === a3, y === OuterReference(a))), JoinHint.NONE)))

Review Comment:
   Added.
   However, there is no DomainJoin. I don't think it matters which side 
(optional or preserved) is involved in the equivalence class, the variable can 
still be replaced without the domain join. 
   In the test that I added, the correlated conjunct is b3 == 
OuterReference(b). Since it is an outer join, after the join either b3 == b or 
b3 == NULL. Since the top-level join (introduced by the subquery) is a left 
outer join (unless narrowed), we can just substitute OuterReference(b) with b3, 
so that the NULL values for b3 will be present after the top-level join.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to