szehon-ho commented on code in PR #56477: URL: https://github.com/apache/spark/pull/56477#discussion_r3431967290
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CoalesceHintUtils.scala: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import java.util.Locale + +import org.apache.spark.sql.catalyst.expressions.{Ascending, ByteLiteral, Expression, IntegerLiteral, ShortLiteral, SortOrder, StringLiteral} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, RepartitionByExpression, UnresolvedHint} +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * Helper functions used to build the logical plans for the "COALESCE", "REPARTITION", + * "REPARTITION_BY_RANGE" and "REBALANCE" hints. Review Comment: Minor: this scaladoc lists "REBALANCE" among the hints handled here, but `createRebalance` was intentionally left in `ResolveCoalesceHints` and isn't part of this object. Only the shared `getNumOfPartitions` / `validateParameters` helpers are used by the rebalance path. Either drop "REBALANCE" from this list, or move `createRebalance` here so the doc matches the contents. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala: ########## @@ -174,103 +174,12 @@ object ResolveHints { * COALESCE Hint accepts names "COALESCE", "REPARTITION", "REPARTITION_BY_RANGE" and "REBALANCE". */ object ResolveCoalesceHints extends Rule[LogicalPlan] { - private def getNumOfPartitions(hint: UnresolvedHint): (Option[Int], Seq[Expression]) = { - hint.parameters match { - case Seq(ByteLiteral(numPartitions), _*) => - (Some(numPartitions.toInt), hint.parameters.tail) - case Seq(ShortLiteral(numPartitions), _*) => - (Some(numPartitions.toInt), hint.parameters.tail) - case Seq(IntegerLiteral(numPartitions), _*) => (Some(numPartitions), hint.parameters.tail) - case _ => (None, hint.parameters) - } - } - - private def validateParameters(hint: String, parms: Seq[Expression]): Unit = { - val invalidParams = parms.filter(!_.isInstanceOf[UnresolvedAttribute]) - if (invalidParams.nonEmpty) { - val hintName = hint.toUpperCase(Locale.ROOT) - throw QueryCompilationErrors.invalidHintParameterError(hintName, invalidParams) - } - } - - /** - * This function handles hints for "COALESCE" and "REPARTITION". - * The "COALESCE" hint only has a partition number as a parameter. The "REPARTITION" hint - * has a partition number, columns, or both of them as parameters. - */ - private def createRepartition(shuffle: Boolean, hint: UnresolvedHint): LogicalPlan = { - - def createRepartitionByExpression( - numPartitions: Option[Int], partitionExprs: Seq[Expression]): RepartitionByExpression = { - val sortOrders = partitionExprs.filter(_.isInstanceOf[SortOrder]) - if (sortOrders.nonEmpty) { - throw QueryCompilationErrors.invalidRepartitionExpressionsError(sortOrders) - } - validateParameters(hint.name, partitionExprs) - RepartitionByExpression(partitionExprs, hint.child, numPartitions) - } - - getNumOfPartitions(hint) match { - case (Some(numPartitions), partitionExprs) if partitionExprs.isEmpty => - Repartition(numPartitions, shuffle, hint.child) - // The "COALESCE" hint (shuffle = false) must have a partition number only - case _ if !shuffle => - throw QueryCompilationErrors.invalidCoalesceHintParameterError( - hint.name.toUpperCase(Locale.ROOT)) - case (Some(numPartitions), partitionExprs) => - createRepartitionByExpression(Some(numPartitions), partitionExprs) - case (None, partitionExprs) => - createRepartitionByExpression(None, partitionExprs) - } - } - - /** - * This function handles hints for "REPARTITION_BY_RANGE". - * The "REPARTITION_BY_RANGE" hint must have column names and a partition number is optional. - */ - private def createRepartitionByRange(hint: UnresolvedHint): RepartitionByExpression = { - def createRepartitionByExpression( - numPartitions: Option[Int], partitionExprs: Seq[Expression]): RepartitionByExpression = { - validateParameters(hint.name, partitionExprs) - val sortOrder = partitionExprs.map { - case expr: SortOrder => expr - case expr: Expression => SortOrder(expr, Ascending) - } - RepartitionByExpression(sortOrder, hint.child, numPartitions) - } - - getNumOfPartitions(hint) match { - case (Some(numPartitions), partitionExprs) => - createRepartitionByExpression(Some(numPartitions), partitionExprs) - case (None, partitionExprs) => - createRepartitionByExpression(None, partitionExprs) - } - } + import CoalesceHintUtils._ private def createRebalance(hint: UnresolvedHint): LogicalPlan = { - def createRebalancePartitions( - partitionExprs: Seq[Expression], - initialNumPartitions: Option[Int]): RebalancePartitions = { - validateParameters(hint.name, partitionExprs) - RebalancePartitions(partitionExprs, hint.child, initialNumPartitions) - } - - getNumOfPartitions(hint) match { - case (Some(numPartitions), partitionExprs) => - createRebalancePartitions(partitionExprs, Some(numPartitions)) - case (None, partitionExprs) => - createRebalancePartitions(partitionExprs, None) - } - } - - private def transformStringToAttribute(hint: UnresolvedHint): UnresolvedHint = { - // for all the coalesce hints, it's safe to transform the string literal to an attribute as - // all the parameters should be column names. - val parameters = hint.parameters.map { - case StringLiteral(name) => UnresolvedAttribute(name) - case e => e - } - hint.copy(parameters = parameters) + val (numPartitionsOption, partitionExprs) = getNumOfPartitions(hint) + validateParameters(hint.name, partitionExprs) + RebalancePartitions(partitionExprs, hint.child, numPartitionsOption) Review Comment: After this refactor `createRebalance` is a stateless 3-liner that only depends on `UnresolvedHint`, the shared helpers, and the OSS `RebalancePartitions` plan node — i.e. it now meets the exact same criteria as the helpers moved into `CoalesceHintUtils`. The PR description justifies keeping it here because the rule "retains its only state dependency, `conf.adaptiveExecutionEnabled`," but that AQE gating actually lives in `apply()` (`case "REBALANCE" if conf.adaptiveExecutionEnabled`), not in `createRebalance`. For consistency, consider moving `createRebalance` into `CoalesceHintUtils` as well (or, if there's a reason to keep it in the rule, tightening the rationale). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
