JoshRosen commented on code in PR #48661: URL: https://github.com/apache/spark/pull/48661#discussion_r1819930126
########## sql/core/src/main/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffset.scala: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions.SortOrder +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.internal.SQLConf + +/** + * When LIMIT/OFFSET is the root node, Spark plans it as CollectLimitExec which preserves the data + * ordering. However, when OFFSET/LIMIT is not the root node, Spark uses GlobalLimitExec which + * shuffles all the data into one partition and then gets a slice of it. Unfortunately, the shuffle + * reader fetches shuffle blocks in a random order and can not preserve the data ordering, which + * violates the requirement of LIMIT/OFFSET. + * + * This rule inserts an extra local sort before LIMIT/OFFSET to preserve the data ordering. + * TODO: add a order preserving mode in the shuffle reader. + */ +object InsertSortForLimitAndOffset extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = { + if (!conf.getConf(SQLConf.ORDERING_AWARE_LIMIT_OFFSET)) return plan + + plan transform { + case l @ GlobalLimitExec( + _, + SinglePartitionShuffleWithGlobalOrdering(ordering), + _) => + val newChild = SortExec(ordering, global = false, child = l.child) + l.withNewChildren(Seq(newChild)) + } + } + + object SinglePartitionShuffleWithGlobalOrdering { + def unapply(plan: SparkPlan): Option[Seq[SortOrder]] = plan match { + case ShuffleExchangeExec(SinglePartition, GlobalOrdering(ordering), _, _) => Some(ordering) + case p: AQEShuffleReadExec => unapply(p.child) + case p: ShuffleQueryStageExec => unapply(p.plan) + case _ => None + } + } + + object GlobalOrdering { + def unapply(plan: SparkPlan): Option[Seq[SortOrder]] = plan match { + case p: SortExec if p.global => Some(p.sortOrder) + case p: LocalLimitExec => unapply(p.child) + case p: WholeStageCodegenExec => unapply(p.child) Review Comment: (Mostly note to self, but writing it out to publicly state my understanding and confirm that it's correct) The child of a WholeStageCodegenExec is the top-level node of the plan being codegened, so really we're just stripping off an intermediate "wrapper" node here so that we can match on `LocalLimitExec` and `SortExec`. This is _not_ implementing a generalized notion of "global order preservation", i.e. it's not adding handling to recognize that ordering is / could be preserved in cases like `GlobalLimitExec(Project(LocalLimitExec(Sort(...)))`. Doing that would be hard: we'd need to handle codegen and non-codegen cases, plus would probably need more complicated logic in the actual rewriting step. If we tried to generalize this we'd also bump into questions about our intended/supported semantics: per https://dba.stackexchange.com/questions/82930/database-implementations-of-order-by-in-a-subquery, https://stackoverflow.com/questions/47171039/tsql-order-by-clause-in-a-cte-expression, and other sources, not all databases make guarantees about behaviors of `ORDER BY` in subqueries or CTEs and I think those are the only ways we could use SQL to obtain such plan structures. However, the DataFrame API introduces a bit of a monkeywrench because it can directly construct those plan shapes without using subqueries or CTES. We _de facto_ have certain behaviors there, but I don't think we've formally specified or guaranteed them. Given this, I think it makes sense to first tackle the regular top-level `ORDER BY` semantics and defer any more generalized considerations to followups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
