allisonwang-db commented on code in PR #42705:
URL: https://github.com/apache/spark/pull/42705#discussion_r1323305701


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1405,6 +1405,11 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
           failOnInvalidOuterReference(g)
           checkPlan(g.child, aggregated, canContainOuter)
 
+        // Correlated subquery can have a LIMIT clause
+        case l@Limit(_, input) =>

Review Comment:
   ```suggestion
           case l @ Limit(_, input) =>
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -655,6 +655,39 @@ object DecorrelateInnerQuery extends PredicateHelper {
             val newProject = Project(newProjectList ++ referencesToAdd, 
newChild)
             (newProject, joinCond, outerReferenceMap)
 
+          case Limit(limit, input) =>
+            // LIMIT K (with potential ORDER BY) is decorrelated by computing 
K rows per every
+            // domain value via a row_number() window function. For example, 
for a subquery
+            // (SELECT T2.a FROM T2 WHERE T2.b = OuterReference(x) ORDER BY 
T2.c LIMIT 3)
+            // -- we need to get top 3 values of T2.a (ordering by T2.c) for 
every value of x.
+            // Following our general decorrelation procedure, 'x' is then 
replaced by T2.b, so the
+            // subquery is decorrelated as:
+            // SELECT * FROM (

Review Comment:
   Great explanation here!



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala:
##########
@@ -59,6 +59,25 @@ class DecorrelateInnerQuerySuite extends PlanTest {
     joinCond.zip(conditions).foreach(e => compareExpressions(e._1, e._2))
   }
 
+  private def check(
+                     outputPlan: LogicalPlan,
+                     joinCond: Seq[Expression],
+                     correctAnswer: LogicalPlan,
+                     conditions: Seq[Expression]): Unit = {
+    assert(!hasOuterReferences(outputPlan))

Review Comment:
   nit: indent
   ```suggestion
     private def check(
         outputPlan: LogicalPlan,
         joinCond: Seq[Expression],
         correctAnswer: LogicalPlan,
         conditions: Seq[Expression]): Unit = {
       assert(!hasOuterReferences(outputPlan))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to