[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657606907 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ## @@ -92,10 +92,17 @@ case object REPARTITION_BY_COL extends ShuffleOrigin // a certain partition number. Spark can't optimize it. case object REPARTITION_BY_NUM extends ShuffleOrigin -// Indicates that the shuffle operator was added by the user-specified repartition operator. Spark -// firstly tries to coalesce partitions, if it cannot be coalesced, then use the local shuffle -// reader. -case object REPARTITION_BY_NONE extends ShuffleOrigin +// Indicates that the rebalance operator was added by the user-specified repartition operator. Review comment: same for https://github.com/apache/spark/pull/32932/files#diff-7ecb26a3ce3210f09ab99080d3313aaaf7f6ffa4bda4f80207a686dd812d3a1bR100 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657606784 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ## @@ -92,10 +92,17 @@ case object REPARTITION_BY_COL extends ShuffleOrigin // a certain partition number. Spark can't optimize it. case object REPARTITION_BY_NUM extends ShuffleOrigin -// Indicates that the shuffle operator was added by the user-specified repartition operator. Spark -// firstly tries to coalesce partitions, if it cannot be coalesced, then use the local shuffle -// reader. -case object REPARTITION_BY_NONE extends ShuffleOrigin +// Indicates that the rebalance operator was added by the user-specified repartition operator. Review comment: the shuffle operator was added by the user-specified rebalance operator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657606603 ## File path: docs/sql-performance-tuning.md ## @@ -228,6 +228,8 @@ The "REPARTITION_BY_RANGE" hint must have column names and a partition number is SELECT /*+ REPARTITION */ * FROM t SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t +SELECT /*+ REBALANCE_PARTITIONS */ * FROM t Review comment: SGTM. The logical plan name can still be `RebalancePartitions` though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657210898 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ## @@ -93,9 +93,16 @@ case object REPARTITION_BY_COL extends ShuffleOrigin case object REPARTITION_BY_NUM extends ShuffleOrigin // Indicates that the shuffle operator was added by the user-specified repartition operator. Spark -// firstly tries to coalesce partitions, if it cannot be coalesced, then use the local shuffle -// reader. -case object REPARTITION_BY_NONE extends ShuffleOrigin +// will try to rebalance partitions that make per-partition size not too small and not too big, +// if can not rebalance partitions then use the local shuffle reader. +case object REBALANCE_PARTITIONS_BY_NONE extends ShuffleOrigin + +// Indicates that the shuffle operator was added by the user-specified repartition operator with +// columns. Spark will try to rebalance partitions that make per-partition size not too small and +// not too big. +// Different from `REBALANCE_PARTITIONS_BY_NONE`, this operator also try its best to partition the Review comment: let's say: `Local shuffle reader cannot be used for it as the output needs to be partitioned by the given columns.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657209487 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ## @@ -93,9 +93,16 @@ case object REPARTITION_BY_COL extends ShuffleOrigin case object REPARTITION_BY_NUM extends ShuffleOrigin // Indicates that the shuffle operator was added by the user-specified repartition operator. Spark -// firstly tries to coalesce partitions, if it cannot be coalesced, then use the local shuffle -// reader. -case object REPARTITION_BY_NONE extends ShuffleOrigin +// will try to rebalance partitions that make per-partition size not too small and not too big, +// if can not rebalance partitions then use the local shuffle reader. +case object REBALANCE_PARTITIONS_BY_NONE extends ShuffleOrigin + +// Indicates that the shuffle operator was added by the user-specified repartition operator with Review comment: ditto, rebalance operator -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657209115 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ## @@ -93,9 +93,16 @@ case object REPARTITION_BY_COL extends ShuffleOrigin case object REPARTITION_BY_NUM extends ShuffleOrigin // Indicates that the shuffle operator was added by the user-specified repartition operator. Spark -// firstly tries to coalesce partitions, if it cannot be coalesced, then use the local shuffle -// reader. -case object REPARTITION_BY_NONE extends ShuffleOrigin +// will try to rebalance partitions that make per-partition size not too small and not too big, +// if can not rebalance partitions then use the local shuffle reader. Review comment: > if can not rebalance partitions then use the local shuffle reader. This is incorrect. Local shuffle reader can also balance the partitions. Let's say ``` Local shuffle reader will be used if possible to reduce network traffic. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657207151 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ## @@ -93,9 +93,16 @@ case object REPARTITION_BY_COL extends ShuffleOrigin case object REPARTITION_BY_NUM extends ShuffleOrigin // Indicates that the shuffle operator was added by the user-specified repartition operator. Spark Review comment: repartition operator -> rebalance operator -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657101198 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ## @@ -95,7 +95,11 @@ case object REPARTITION_BY_NUM extends ShuffleOrigin // Indicates that the shuffle operator was added by the user-specified repartition operator. Spark // firstly tries to coalesce partitions, if it cannot be coalesced, then use the local shuffle // reader. -case object REPARTITION_BY_NONE extends ShuffleOrigin +case object REBALANCE_PARTITIONS_BY_NONE extends ShuffleOrigin + +// Indicates that the shuffle operator was not guaranteed the output partitioning so Spark +// can try to optimize the partition number in AQE framework. Review comment: please highlight the difference between this one and `REBALANCE_PARTITIONS_BY_NONE ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657100802 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ## @@ -95,7 +95,11 @@ case object REPARTITION_BY_NUM extends ShuffleOrigin // Indicates that the shuffle operator was added by the user-specified repartition operator. Spark // firstly tries to coalesce partitions, if it cannot be coalesced, then use the local shuffle // reader. -case object REPARTITION_BY_NONE extends ShuffleOrigin +case object REBALANCE_PARTITIONS_BY_NONE extends ShuffleOrigin Review comment: please update the doc for it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657099143 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala ## @@ -295,4 +296,28 @@ class ResolveHintsSuite extends AnalysisTest { caseSensitive = true) } } + + test("SPARK-35786: Support optimize repartition by expression in AQE") { +checkAnalysisWithoutViewWrapper( + UnresolvedHint("REBALANCE_PARTITIONS", Seq(UnresolvedAttribute("a")), table("TaBlE")), + RebalancePartitions(Seq(AttributeReference("a", IntegerType)()), testRelation)) + +checkAnalysisWithoutViewWrapper( + UnresolvedHint("REBALANCE_PARTITIONS", Seq.empty, table("TaBlE")), + RebalancePartitions(Seq.empty, testRelation)) + +withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + checkAnalysisWithoutViewWrapper( +UnresolvedHint("REBALANCE_PARTITIONS", Seq(UnresolvedAttribute("a")), table("TaBlE")), +testRelation) + + checkAnalysisWithoutViewWrapper( +UnresolvedHint("REBALANCE_PARTITIONS", Seq.empty, table("TaBlE")), +testRelation) +} + +assertAnalysisError( + UnresolvedHint("REBALANCE_PARTITIONS", Seq(Literal(1)), table("TaBlE")), Review comment: not related to this PR, I'm wondering if we should support `date(ts)` as the partition expression, instead of plain columns. cc @sunchao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657082306 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ## @@ -1351,6 +1351,31 @@ object RepartitionByExpression { } } +/** + * This operator used to rebalance the query result output partitions, so that every partition + * is of a reasonable size (not too small and not too big). It can take column names as parameters, + * and try its best to partition the query result by these columns. If there are skews, Spark will + * split the skewed partitions, to make these partitions not too big. This operator is useful when + * you need to write the result of this query to a table, to avoid too small/big files. + * + * Note that, only AQE is enabled does the operator make sense. Review comment: Note that, this operator only makes sense when AQE is enabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657081440 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ## @@ -1351,6 +1351,31 @@ object RepartitionByExpression { } } +/** + * This operator used to rebalance the query result output partitions, so that every partition + * is of a reasonable size (not too small and not too big). It can take column names as parameters, + * and try its best to partition the query result by these columns. If there are skews, Spark will + * split the skewed partitions, to make these partitions not too big. This operator is useful when + * you need to write the result of this query to a table, to avoid too small/big files. Review comment: `this query` -> `` `child` `` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657080928 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ## @@ -1351,6 +1351,31 @@ object RepartitionByExpression { } } +/** + * This operator used to rebalance the query result output partitions, so that every partition + * is of a reasonable size (not too small and not too big). It can take column names as parameters, + * and try its best to partition the query result by these columns. If there are skews, Spark will Review comment: ``It also try its best to partition the child output by `partitionExpressions` `` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657080120 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ## @@ -1351,6 +1351,31 @@ object RepartitionByExpression { } } +/** + * This operator used to rebalance the query result output partitions, so that every partition Review comment: `query result output partitions` -> ``output partitions if the given `child` `` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657077411 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ## @@ -1351,6 +1351,31 @@ object RepartitionByExpression { } } +/** + * This operator used to rebalance the query result output partitions, so that every partition Review comment: `is used to` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r657076883 ## File path: docs/sql-ref-syntax-qry-select-hints.md ## @@ -51,6 +51,15 @@ specified, multiple nodes are inserted into the logical plan, but the leftmost h The `REPARTITION_BY_RANGE` hint can be used to repartition to the specified number of partitions using the specified partitioning expressions. It takes column names and an optional partition number as parameters. +* **REBALANCE_PARTITIONS** + + The `REBALANCE_PARTITIONS` hint can be used to rebalance the query result output partitions, so that Review comment: nit: the other hint docs on this page are just one line, which may be more standard in markdown. Let's follow it and remove \n here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656937937 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala ## @@ -30,7 +30,7 @@ import org.apache.spark.sql.internal.SQLConf case class CoalesceShufflePartitions(session: SparkSession) extends CustomShuffleReaderRule { override val supportedShuffleOrigins: Seq[ShuffleOrigin] = -Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_COL, REPARTITION_BY_NONE) +Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_NONE, REPARTITION_BY_COL, REBALANCE_PARTITIONS) Review comment: ah, SGTM! We can probably replace `REPARTITION_BY_NONE` with `REBALANCE_PARTITIONS_BY_NONE` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656925697 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala ## @@ -30,7 +30,7 @@ import org.apache.spark.sql.internal.SQLConf case class CoalesceShufflePartitions(session: SparkSession) extends CustomShuffleReaderRule { override val supportedShuffleOrigins: Seq[ShuffleOrigin] = -Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_COL, REPARTITION_BY_NONE) +Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_NONE, REPARTITION_BY_COL, REBALANCE_PARTITIONS) Review comment: Can we update `OptimizeLocalShuffleReader` a bit more to not optimize `HashPartitioning`? Ideally `REBALANCE_PARTITIONS` without parameters should be able to apply local shuffle reader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656924190 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala ## @@ -248,6 +249,22 @@ object ResolveHints { } } +private def createRebalance(hint: UnresolvedHint): LogicalPlan = { + hint.parameters match { +case partitionExprs @ Seq(_*) => + val sortOrders = partitionExprs.filter(_.isInstanceOf[SortOrder]) + if (sortOrders.nonEmpty) { +throw QueryCompilationErrors.invalidRepartitionExpressionsError(sortOrders) Review comment: do we need this check? we only allow `UnresolvedAttribute` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656877211 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala ## @@ -1788,17 +1788,20 @@ class AdaptiveQueryExecSuite test("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { - val query = "SELECT /*+ REPARTITION */ * FROM testData" - val (_, adaptivePlan) = runAdaptiveAndVerifyResult(query) - collect(adaptivePlan) { -case r: CustomShuffleReaderExec => r - } match { -case Seq(customShuffleReader) => - assert(customShuffleReader.partitionSpecs.size === 1) - assert(!customShuffleReader.isLocalReader) -case _ => - fail("There should be a CustomShuffleReaderExec") - } + Seq("REPARTITION", "REBALANCE_PARTITIONS", "REBALANCE_PARTITIONS(key)") Review comment: can we exclude `REBALANCE_PARTITIONS`? I think we should be able to apply local shuffle reader for it, in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656875976 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala ## @@ -30,7 +30,7 @@ import org.apache.spark.sql.internal.SQLConf case class CoalesceShufflePartitions(session: SparkSession) extends CustomShuffleReaderRule { override val supportedShuffleOrigins: Seq[ShuffleOrigin] = -Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_COL, REPARTITION_BY_NONE) +Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_NONE, REPARTITION_BY_COL, REBALANCE_PARTITIONS) Review comment: Do we need to update `OptimizeLocalShuffleReader`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656875209 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ## @@ -1351,6 +1351,28 @@ object RepartitionByExpression { } } +/** + * This operator does not guarantee the output partitioning, because the partition number will be Review comment: let's explain what it does. We can copy something from https://github.com/apache/spark/pull/32932/files#diff-4df7a54d6e4cfdbb5b68f2d2bcb083c2c692514b53c8fd7d61c24363061bb51cR56 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656874146 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala ## @@ -1788,17 +1788,20 @@ class AdaptiveQueryExecSuite test("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Review comment: not related to this PR: can we set the config to disable partition colaesce directly? tuning the min num looks weird -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656872765 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ## @@ -1351,6 +1351,28 @@ object RepartitionByExpression { } } +/** + * This operator does not guarantee the output partitioning, because the partition number will be + * optimized by AQE. + */ +case class RebalancePartitions( +partitionExpressions: Seq[Expression], +child: LogicalPlan) extends UnaryNode { + override def maxRows: Option[Long] = child.maxRows + override def output: Seq[Attribute] = child.output + + lazy val numPartitions: Int = conf.numShufflePartitions Review comment: can't we just call `conf.numShufflePartitions` when needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656871920 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala ## @@ -175,19 +175,20 @@ object ResolveHints { */ object ResolveCoalesceHints extends Rule[LogicalPlan] { -val COALESCE_HINT_NAMES: Set[String] = Set("COALESCE", "REPARTITION", "REPARTITION_BY_RANGE") +val COALESCE_HINT_NAMES: Set[String] = + Set("COALESCE", "REPARTITION", "REPARTITION_BY_RANGE", "REBALANCE_PARTITIONS") /** * This function handles hints for "COALESCE" and "REPARTITION". * The "COALESCE" hint only has a partition number as a parameter. The "REPARTITION" hint * has a partition number, columns, or both of them as parameters. */ private def createRepartition( -shuffle: Boolean, hint: UnresolvedHint): LogicalPlan = { +shuffle: Boolean, hint: UnresolvedHint, adaptive: Boolean = false): LogicalPlan = { Review comment: `REPARTITION_BY_RANGE` also has an individual method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656871464 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala ## @@ -175,19 +175,20 @@ object ResolveHints { */ object ResolveCoalesceHints extends Rule[LogicalPlan] { -val COALESCE_HINT_NAMES: Set[String] = Set("COALESCE", "REPARTITION", "REPARTITION_BY_RANGE") +val COALESCE_HINT_NAMES: Set[String] = + Set("COALESCE", "REPARTITION", "REPARTITION_BY_RANGE", "REBALANCE_PARTITIONS") /** * This function handles hints for "COALESCE" and "REPARTITION". * The "COALESCE" hint only has a partition number as a parameter. The "REPARTITION" hint * has a partition number, columns, or both of them as parameters. */ private def createRepartition( -shuffle: Boolean, hint: UnresolvedHint): LogicalPlan = { +shuffle: Boolean, hint: UnresolvedHint, adaptive: Boolean = false): LogicalPlan = { Review comment: Actually, can we create a new method `def createRebalance`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656869016 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala ## @@ -175,19 +175,20 @@ object ResolveHints { */ object ResolveCoalesceHints extends Rule[LogicalPlan] { -val COALESCE_HINT_NAMES: Set[String] = Set("COALESCE", "REPARTITION", "REPARTITION_BY_RANGE") +val COALESCE_HINT_NAMES: Set[String] = + Set("COALESCE", "REPARTITION", "REPARTITION_BY_RANGE", "REBALANCE_PARTITIONS") /** * This function handles hints for "COALESCE" and "REPARTITION". * The "COALESCE" hint only has a partition number as a parameter. The "REPARTITION" hint * has a partition number, columns, or both of them as parameters. */ private def createRepartition( -shuffle: Boolean, hint: UnresolvedHint): LogicalPlan = { +shuffle: Boolean, hint: UnresolvedHint, adaptive: Boolean = false): LogicalPlan = { Review comment: nit: `adaptive` -> `isRebalance` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656836290 ## File path: docs/sql-ref-syntax-qry-select-hints.md ## @@ -51,6 +51,10 @@ specified, multiple nodes are inserted into the logical plan, but the leftmost h The `REPARTITION_BY_RANGE` hint can be used to repartition to the specified number of partitions using the specified partitioning expressions. It takes column names and an optional partition number as parameters. +* **REPARTITION_BY_AQE** Review comment: Or fallback to `repartition(col)`? I don't have a strong preference -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656799496 ## File path: docs/sql-ref-syntax-qry-select-hints.md ## @@ -51,6 +51,10 @@ specified, multiple nodes are inserted into the logical plan, but the leftmost h The `REPARTITION_BY_RANGE` hint can be used to repartition to the specified number of partitions using the specified partitioning expressions. It takes column names and an optional partition number as parameters. +* **REPARTITION_BY_AQE** Review comment: The doc here is pretty bad... ``` **REBALANCE_PARTITIONS** The `REBALANCE_PARTITIONS` hint can be used to rebalance the query result output partitions, so that every partition is of a reasonable size (not too small and not too big). It can take column names as parameters, and try its best to partition the query result by these columns. This is a best-effort: if there are skews, Spark will split the skewed partitions, to make these partitions not too big. This hint is useful when you need to write the result of this query to a table, to avoid too small/big files. This hint is ignored if AQE is not enabled. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656799496 ## File path: docs/sql-ref-syntax-qry-select-hints.md ## @@ -51,6 +51,10 @@ specified, multiple nodes are inserted into the logical plan, but the leftmost h The `REPARTITION_BY_RANGE` hint can be used to repartition to the specified number of partitions using the specified partitioning expressions. It takes column names and an optional partition number as parameters. +* **REPARTITION_BY_AQE** Review comment: The doc here is pretty bad... ``` **REBALANCE_PARTITIONS** The `REBALANCE_PARTITIONS` hint can be used to rebalance the query result output partitions, so that every partition is of a reasonable size (not too small and not too big). It can take column names as parameters, and try its best to partition the query result by these columns. This is a best-effort: if there are skews, Spark will split the skewed partitions, to make these partitions not too big. This hint is ignored if AQE is not enabled. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656771644 ## File path: docs/sql-performance-tuning.md ## @@ -228,6 +228,8 @@ The "REPARTITION_BY_RANGE" hint must have column names and a partition number is SELECT /*+ REPARTITION */ * FROM t SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t +SELECT /*+ REPARTITION_BY_AQE */ * FROM t Review comment: I like `REBALANCE_PARTITIONS` most, as this is a partition-level thing, not row-levle. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656760225 ## File path: docs/sql-performance-tuning.md ## @@ -228,6 +228,8 @@ The "REPARTITION_BY_RANGE" hint must have column names and a partition number is SELECT /*+ REPARTITION */ * FROM t SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t +SELECT /*+ REPARTITION_BY_AQE */ * FROM t Review comment: or just `REBALANCE_OUTPUT`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656741164 ## File path: docs/sql-performance-tuning.md ## @@ -228,6 +228,8 @@ The "REPARTITION_BY_RANGE" hint must have column names and a partition number is SELECT /*+ REPARTITION */ * FROM t SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t +SELECT /*+ REPARTITION_BY_AQE */ * FROM t Review comment: Other repartition hints can also be optimized by AQE, so I think this name is not precise enough. The key point here is the user intention. To optimize for data writing, we don't need a specific number of partitions, we don't need a strict output partitioning (like partition by a column). We only need to make the output evenly distributed and be partitioned by come columns as possible as we can (best effort). How about `REBALANCE_OUTPUT_PARTITIONS`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656249153 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ## @@ -1351,6 +1351,26 @@ object RepartitionByExpression { } } +/** + * This operator does not guarantee the output partitioning, because the partition number will be + * optimized by AQE. + */ +case class AdaptiveRepartition( +partitionExpressions: Seq[Expression], +child: LogicalPlan) extends RepartitionOperation { Review comment: does it need to extend `RepartitionOperation`? We don't expect any optimizer rule to touch it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656246748 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala ## @@ -196,11 +197,18 @@ object ResolveHints { if (invalidParams.nonEmpty) { throw QueryCompilationErrors.invalidHintParameterError(hintName, invalidParams) } -RepartitionByExpression( - partitionExprs.map(_.asInstanceOf[Expression]), hint.child, numPartitions) +if (adaptive) { + AdaptiveRepartition(partitionExprs.map(_.asInstanceOf[Expression]), hint.child) Review comment: shall we reuse `RepartitionByExpression` and add a boolean flag to it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #32932: [SPARK-35786][SQL] Add a new operator to distingush if AQE can optimize safely
cloud-fan commented on a change in pull request #32932: URL: https://github.com/apache/spark/pull/32932#discussion_r656246044 ## File path: docs/sql-performance-tuning.md ## @@ -228,6 +228,8 @@ The "REPARTITION_BY_RANGE" hint must have column names and a partition number is SELECT /*+ REPARTITION */ * FROM t SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t +SELECT /*+ ADAPTIVE_REPARTITION */ * FROM t +SELECT /*+ ADAPTIVE_REPARTITION(c) */ * FROM t Review comment: I'm not good at naming, let's get more opinions. cc @dongjoon-hyun @HyukjinKwon @maropu @viirya Basically this hint means the user wants to add a shuffle right before data writing, to redistribute data and avoid small files/skewed partitions. It's different from normal repartition as this allows more aggressive optimization from AQE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org