dtenedor commented on code in PR #56477:
URL: https://github.com/apache/spark/pull/56477#discussion_r3463112267


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala:
##########
@@ -174,103 +174,12 @@ object ResolveHints {
    * COALESCE Hint accepts names "COALESCE", "REPARTITION", 
"REPARTITION_BY_RANGE" and "REBALANCE".
    */
   object ResolveCoalesceHints extends Rule[LogicalPlan] {
-    private def getNumOfPartitions(hint: UnresolvedHint): (Option[Int], 
Seq[Expression]) = {
-      hint.parameters match {
-        case Seq(ByteLiteral(numPartitions), _*) =>
-          (Some(numPartitions.toInt), hint.parameters.tail)
-        case Seq(ShortLiteral(numPartitions), _*) =>
-          (Some(numPartitions.toInt), hint.parameters.tail)
-        case Seq(IntegerLiteral(numPartitions), _*) => (Some(numPartitions), 
hint.parameters.tail)
-        case _ => (None, hint.parameters)
-      }
-    }
-
-    private def validateParameters(hint: String, parms: Seq[Expression]): Unit 
= {
-      val invalidParams = parms.filter(!_.isInstanceOf[UnresolvedAttribute])
-      if (invalidParams.nonEmpty) {
-        val hintName = hint.toUpperCase(Locale.ROOT)
-        throw QueryCompilationErrors.invalidHintParameterError(hintName, 
invalidParams)
-      }
-    }
-
-    /**
-     * This function handles hints for "COALESCE" and "REPARTITION".
-     * The "COALESCE" hint only has a partition number as a parameter. The 
"REPARTITION" hint
-     * has a partition number, columns, or both of them as parameters.
-     */
-    private def createRepartition(shuffle: Boolean, hint: UnresolvedHint): 
LogicalPlan = {
-
-      def createRepartitionByExpression(
-          numPartitions: Option[Int], partitionExprs: Seq[Expression]): 
RepartitionByExpression = {
-        val sortOrders = partitionExprs.filter(_.isInstanceOf[SortOrder])
-        if (sortOrders.nonEmpty) {
-          throw 
QueryCompilationErrors.invalidRepartitionExpressionsError(sortOrders)
-        }
-        validateParameters(hint.name, partitionExprs)
-        RepartitionByExpression(partitionExprs, hint.child, numPartitions)
-      }
-
-      getNumOfPartitions(hint) match {
-        case (Some(numPartitions), partitionExprs) if partitionExprs.isEmpty =>
-          Repartition(numPartitions, shuffle, hint.child)
-        // The "COALESCE" hint (shuffle = false) must have a partition number 
only
-        case _ if !shuffle =>
-          throw QueryCompilationErrors.invalidCoalesceHintParameterError(
-            hint.name.toUpperCase(Locale.ROOT))
-        case (Some(numPartitions), partitionExprs) =>
-          createRepartitionByExpression(Some(numPartitions), partitionExprs)
-        case (None, partitionExprs) =>
-          createRepartitionByExpression(None, partitionExprs)
-      }
-    }
-
-    /**
-     * This function handles hints for "REPARTITION_BY_RANGE".
-     * The "REPARTITION_BY_RANGE" hint must have column names and a partition 
number is optional.
-     */
-    private def createRepartitionByRange(hint: UnresolvedHint): 
RepartitionByExpression = {
-      def createRepartitionByExpression(
-          numPartitions: Option[Int], partitionExprs: Seq[Expression]): 
RepartitionByExpression = {
-        validateParameters(hint.name, partitionExprs)
-        val sortOrder = partitionExprs.map {
-          case expr: SortOrder => expr
-          case expr: Expression => SortOrder(expr, Ascending)
-        }
-        RepartitionByExpression(sortOrder, hint.child, numPartitions)
-      }
-
-      getNumOfPartitions(hint) match {
-        case (Some(numPartitions), partitionExprs) =>
-          createRepartitionByExpression(Some(numPartitions), partitionExprs)
-        case (None, partitionExprs) =>
-          createRepartitionByExpression(None, partitionExprs)
-      }
-    }
+    import CoalesceHintUtils._
 
     private def createRebalance(hint: UnresolvedHint): LogicalPlan = {
-      def createRebalancePartitions(
-          partitionExprs: Seq[Expression],
-          initialNumPartitions: Option[Int]): RebalancePartitions = {
-        validateParameters(hint.name, partitionExprs)
-        RebalancePartitions(partitionExprs, hint.child, initialNumPartitions)
-      }
-
-      getNumOfPartitions(hint) match {
-        case (Some(numPartitions), partitionExprs) =>
-          createRebalancePartitions(partitionExprs, Some(numPartitions))
-        case (None, partitionExprs) =>
-          createRebalancePartitions(partitionExprs, None)
-      }
-    }
-
-    private def transformStringToAttribute(hint: UnresolvedHint): 
UnresolvedHint = {
-      // for all the coalesce hints, it's safe to transform the string literal 
to an attribute as
-      // all the parameters should be column names.
-      val parameters = hint.parameters.map {
-        case StringLiteral(name) => UnresolvedAttribute(name)
-        case e => e
-      }
-      hint.copy(parameters = parameters)
+      val (numPartitionsOption, partitionExprs) = getNumOfPartitions(hint)
+      validateParameters(hint.name, partitionExprs)
+      RebalancePartitions(partitionExprs, hint.child, numPartitionsOption)

Review Comment:
   Sounds good, I moved "REBALANCE" as suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to