sunchao commented on a change in pull request #35657:
URL: https://github.com/apache/spark/pull/35657#discussion_r838898280



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -569,6 +629,87 @@ case class HashShuffleSpec(
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class KeyGroupedShuffleSpec(
+    partitioning: KeyGroupedPartitioning,
+    distribution: ClusteredDistribution) extends ShuffleSpec {
+
+  /**
+   * A sequence where each element is a set of positions of the partition 
expression to the cluster
+   * keys. For instance, if cluster keys are [a, b, b] and partition 
expressions are
+   * [bucket(4, a), years(b)], the result will be [(0), (1, 2)].
+   */
+  lazy val keyPositions: Seq[mutable.BitSet] = {
+    val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet]
+    distribution.clustering.zipWithIndex.foreach { case (distKey, distKeyPos) 
=>
+      distKeyToPos.getOrElseUpdate(distKey.canonicalized, 
mutable.BitSet.empty).add(distKeyPos)
+    }
+    partitioning.expressions.map { e =>
+      val leaves = e.collectLeaves()
+      assert(leaves.size == 1, s"Expected exactly one leaf expression from $e, 
but found " +
+          s"${leaves.size}")
+      distKeyToPos.getOrElse(leaves.head.canonicalized, mutable.BitSet.empty)
+    }
+  }
+
+  private lazy val ordering: Ordering[InternalRow] =
+    
RowOrdering.createNaturalAscendingOrdering(partitioning.expressions.map(_.dataType))
+
+  override def numPartitions: Int = partitioning.numPartitions
+
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+    // Here we check:
+    //  1. both distributions have the same number of clustering expressions
+    //  2. both partitioning have the same number of partitions
+    //  3. clustering expressions from both sides are compatible in terms of 
number, position
+    //      w.r.t the distribution keys, as well as transform.
+    //  4. the partition values, if present on both sides, are following the 
same order.
+    case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning, 
otherDistribution) =>
+      distribution.clustering.length == otherDistribution.clustering.length &&
+        numPartitions == other.numPartitions &&
+          isClusteringCompatibleWith(otherSpec) &&
+            
partitioning.partitionValuesOpt.zip(otherPartitioning.partitionValuesOpt).forall
 {
+              case (left, right) => left.zip(right).forall { case (l, r) =>
+                ordering.compare(l, r) == 0
+              }
+            }
+    case ShuffleSpecCollection(specs) =>
+      specs.exists(isCompatibleWith)
+    case _ => false
+  }
+
+  /**
+   * Check if the clustering expressions from this spec and `other` are 
compatible. In
+   * particular, whether each pair of expressions have compatible transform 
expressions.
+   *
+   * @param other the partitioning from the other side
+   * @return true if the clustering expressions are compatible, false otherwise
+   */
+  private def isClusteringCompatibleWith(other: KeyGroupedShuffleSpec): 
Boolean = {

Review comment:
       I'm lack of a good name here ... let me just inline it then since it's 
just used in a single place.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to