wangyum commented on a change in pull request #32781:
URL: https://github.com/apache/spark/pull/32781#discussion_r646407033
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1351,6 +1351,45 @@ object RepartitionByExpression {
}
}
+/**
+ * This method repartitions data using [[Expression]]s into
`numShufflePartitions` defined in
+ * `SQLConf`, and could be coalesced by AQE. Usually used to merge small files.
+ */
+case class CoalescePartitions(
+ partitionExpressions: Seq[Expression],
+ child: LogicalPlan) extends RepartitionOperation {
+
+ override val numPartitions = conf.numShufflePartitions
+
+ override val partitioning: Partitioning = {
+ val (sortOrder, nonSortOrder) =
partitionExpressions.partition(_.isInstanceOf[SortOrder])
+
+ require(sortOrder.isEmpty || nonSortOrder.isEmpty,
+ s"${getClass.getSimpleName} expects that either all its
`partitionExpressions` are of type " +
+ "`SortOrder`, which means `RangePartitioning`, or none of them are
`SortOrder`, which " +
+ "means `HashPartitioning`. In this case we have:" +
+ s"""
+ |SortOrder: $sortOrder
+ |NonSortOrder: $nonSortOrder
+ """.stripMargin)
+
+ if (numPartitions == 1) {
+ SinglePartition
+ } else if (sortOrder.nonEmpty) {
+ RangePartitioning(sortOrder.map(_.asInstanceOf[SortOrder]),
numPartitions)
Review comment:
Yes. Maybe we can support `SORTED BY` clause in `CREATE TABLE` statement
in future.
https://issues.apache.org/jira/browse/SPARK-31470
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]