JoshRosen commented on code in PR #48661:
URL: https://github.com/apache/spark/pull/48661#discussion_r1819902550


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffset.scala:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions.SortOrder
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, 
ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * When LIMIT/OFFSET is the root node, Spark plans it as CollectLimitExec 
which preserves the data
+ * ordering. However, when OFFSET/LIMIT is not the root node, Spark uses 
GlobalLimitExec which
+ * shuffles all the data into one partition and then gets a slice of it. 
Unfortunately, the shuffle
+ * reader fetches shuffle blocks in a random order and can not preserve the 
data ordering, which
+ * violates the requirement of LIMIT/OFFSET.
+ *
+ * This rule inserts an extra local sort before LIMIT/OFFSET to preserve the 
data ordering.
+ * TODO: add a order preserving mode in the shuffle reader.
+ */
+object InsertSortForLimitAndOffset extends Rule[SparkPlan] {
+  override def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.getConf(SQLConf.ORDERING_AWARE_LIMIT_OFFSET)) return plan
+
+    plan transform {
+      case l @ GlobalLimitExec(
+          _,
+          SinglePartitionShuffleWithGlobalOrdering(ordering),
+          _) =>
+        val newChild = SortExec(ordering, global = false, child = l.child)

Review Comment:
   I think that the intuition that this local sort could instead potentially be 
a more optimized _local_ top-K is sound, but I think we currently lack suitable 
optimized implementations for that: the current priority queue in `takeOrdered` 
doesn't support spilling so naive use of it could risk OOMs for large limits.
   
   A full local sort may be slower but at least it supports spilling.
   
   As a future optimization, we could consider implementing a dedicated 
`LocalTopK` physical operator: internally, that operator could use a heap / 
priority queue with sort fallback, or a sorting + spilling technique optimized 
for top-K, or one of any other number of techniques described in existing 
literature.
   
   Such an operator might have other advantages, too, such as allowing local- 
and global-top-K phases to be re-ordered w.r.t. other operations. The current 
`TakeOrderedAndProject` operator is fairly monolithic and combines multiple 
phases into a single "jumbo" operator, precluding certain optimizations.
   
   For now, though, I think we should stick with the use of the existing 
physical `Sort` operator and leave local top-K to possible future work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to