cloud-fan commented on code in PR #42705:
URL: https://github.com/apache/spark/pull/42705#discussion_r1323967912


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -655,6 +655,39 @@ object DecorrelateInnerQuery extends PredicateHelper {
             val newProject = Project(newProjectList ++ referencesToAdd, 
newChild)
             (newProject, joinCond, outerReferenceMap)
 
+          case Limit(limit, input) =>
+            // LIMIT K (with potential ORDER BY) is decorrelated by computing 
K rows per every
+            // domain value via a row_number() window function. For example, 
for a subquery
+            // (SELECT T2.a FROM T2 WHERE T2.b = OuterReference(x) ORDER BY 
T2.c LIMIT 3)
+            // -- we need to get top 3 values of T2.a (ordering by T2.c) for 
every value of x.
+            // Following our general decorrelation procedure, 'x' is then 
replaced by T2.b, so the
+            // subquery is decorrelated as:
+            // SELECT * FROM (
+            //   SELECT T2.a, row_number() OVER (PARTITION BY T2.b ORDER BY 
T2.c) AS rn FROM T2)
+            // WHERE rn <= 3
+            val (child, ordering) = input match {
+              case Sort(order, _, child) => (child, order)
+              case _ => (input, Seq())
+            }
+            val (newChild, joinCond, outerReferenceMap) =
+              decorrelate(child, parentOuterReferences, aggregated = true, 
underSetOp)
+            val collectedChildOuterReferences = 
collectOuterReferencesInPlanTree(child)
+            // Add outer references to the PARTITION BY clause
+            val partitionFields = 
collectedChildOuterReferences.map(outerReferenceMap(_)).toSeq
+            val orderByFields = replaceOuterReferences(ordering, 
outerReferenceMap)
+
+            val rowNumber = WindowExpression(RowNumber(),
+              WindowSpecDefinition(partitionFields, orderByFields,
+                SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
CurrentRow)))
+            val rowNumberAlias = Alias(rowNumber, "rn_" + 
NamedExpression.newExprId.id)()

Review Comment:
   I'm not sure putting the id in the name is useful. We may refresh attribute 
ids in a query plan, and the id in name will be confusing. Shall we just use 
`rn`? The EXPLAIN result will print the Alias expr id, so having the id in the 
name is not useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to