sunchao commented on a change in pull request #35657:
URL: https://github.com/apache/spark/pull/35657#discussion_r839911535



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -574,6 +634,80 @@ case class HashShuffleSpec(
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class KeyGroupedShuffleSpec(
+    partitioning: KeyGroupedPartitioning,
+    distribution: ClusteredDistribution) extends ShuffleSpec {
+
+  /**
+   * A sequence where each element is a set of positions of the partition 
expression to the cluster
+   * keys. For instance, if cluster keys are [a, b, b] and partition 
expressions are
+   * [bucket(4, a), years(b)], the result will be [(0), (1, 2)].
+   *
+   * Note that we only allow each partition expression to contain a single 
partition key.
+   * Therefore the mapping here is very similar to that from `HashShuffleSpec`.
+   */
+  lazy val keyPositions: Seq[mutable.BitSet] = {
+    val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet]
+    distribution.clustering.zipWithIndex.foreach { case (distKey, distKeyPos) 
=>
+      distKeyToPos.getOrElseUpdate(distKey.canonicalized, 
mutable.BitSet.empty).add(distKeyPos)
+    }
+    partitioning.expressions.map { e =>
+      val leaves = e.collectLeaves()
+      assert(leaves.size == 1, s"Expected exactly one child from $e, but found 
${leaves.size}")
+      distKeyToPos.getOrElse(leaves.head.canonicalized, mutable.BitSet.empty)
+    }
+  }
+
+  private lazy val ordering: Ordering[InternalRow] =
+    
RowOrdering.createNaturalAscendingOrdering(partitioning.expressions.map(_.dataType))
+
+  override def numPartitions: Int = partitioning.numPartitions
+
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+    // Here we check:
+    //  1. both distributions have the same number of clustering keys
+    //  2. both partitioning have the same number of partitions
+    //  3. partition expressions from both sides are compatible, which means:
+    //    3.1 both sides have the same number of partition expressions
+    //    3.2 for each pair of partition expressions at the same index, the 
corresponding
+    //        partition keys must share overlapping positions in their 
respective clustering keys.
+    //    3.3 each pair of partition expressions at the same index must share 
compatible
+    //        transform functions.
+    //  4. the partition values, if present on both sides, are following the 
same order.
+    case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning, 
otherDistribution) =>
+      val expressions = partitioning.expressions
+      val otherExpressions = otherPartitioning.expressions
+
+      distribution.clustering.length == otherDistribution.clustering.length &&
+          numPartitions == other.numPartitions &&
+          expressions.length == otherExpressions.length && {
+            val otherKeyPositions = otherSpec.keyPositions
+            keyPositions.zip(otherKeyPositions).forall { case (left, right) =>
+              left.intersect(right).nonEmpty

Review comment:
       In case of literals, Spark expects the clustering keys also have 
literals and thus generates the key position mapping accordingly (in fact 
`EnsureRequirementsSuite` only use literals for testing).
   
   I feel removing the bucket parameter in `TransformExpression` will 
complicate things a bit and need to be handled as a special case in several 
places, since now we have a expression with two children while others only have 
one (we only allow a single child in transform expression yet), and also that 
the `numBuckets` literal is special and doesn't have any mapping in the key 
positions, and thus we'd need to change the logic when checking transform 
expression compatibility.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to