cloud-fan commented on a change in pull request #32781:
URL: https://github.com/apache/spark/pull/32781#discussion_r646410590



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1351,6 +1351,45 @@ object RepartitionByExpression {
   }
 }
 
+/**
+ * This method repartitions data using [[Expression]]s into 
`numShufflePartitions` defined in
+ * `SQLConf`, and could be coalesced by AQE. Usually used to merge small files.
+ */
+case class CoalescePartitions(
+    partitionExpressions: Seq[Expression],
+    child: LogicalPlan) extends RepartitionOperation {
+
+  override val numPartitions = conf.numShufflePartitions
+
+  override val partitioning: Partitioning = {
+    val (sortOrder, nonSortOrder) = 
partitionExpressions.partition(_.isInstanceOf[SortOrder])
+
+    require(sortOrder.isEmpty || nonSortOrder.isEmpty,
+      s"${getClass.getSimpleName} expects that either all its 
`partitionExpressions` are of type " +
+        "`SortOrder`, which means `RangePartitioning`, or none of them are 
`SortOrder`, which " +
+        "means `HashPartitioning`. In this case we have:" +
+        s"""
+           |SortOrder: $sortOrder
+           |NonSortOrder: $nonSortOrder
+       """.stripMargin)
+
+    if (numPartitions == 1) {
+      SinglePartition
+    } else if (sortOrder.nonEmpty) {
+      RangePartitioning(sortOrder.map(_.asInstanceOf[SortOrder]), 
numPartitions)

Review comment:
       That's too far away, and we can probably just use the `Repartition` node.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to