cloud-fan commented on a change in pull request #35657:
URL: https://github.com/apache/spark/pull/35657#discussion_r832348410



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -569,6 +629,87 @@ case class HashShuffleSpec(
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class KeyGroupedShuffleSpec(
+    partitioning: KeyGroupedPartitioning,
+    distribution: ClusteredDistribution) extends ShuffleSpec {
+
+  /**
+   * A sequence where each element is a set of positions of the partition 
expression to the cluster
+   * keys. For instance, if cluster keys are [a, b, b] and partition 
expressions are
+   * [bucket(4, a), years(b)], the result will be [(0), (1, 2)].
+   */
+  lazy val keyPositions: Seq[mutable.BitSet] = {
+    val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet]
+    distribution.clustering.zipWithIndex.foreach { case (distKey, distKeyPos) 
=>
+      distKeyToPos.getOrElseUpdate(distKey.canonicalized, 
mutable.BitSet.empty).add(distKeyPos)
+    }
+    partitioning.expressions.map { e =>
+      val leaves = e.collectLeaves()
+      assert(leaves.size == 1, s"Expected exactly one leaf expression from $e, 
but found " +
+          s"${leaves.size}")
+      distKeyToPos.getOrElse(leaves.head.canonicalized, mutable.BitSet.empty)
+    }
+  }
+
+  private lazy val ordering: Ordering[InternalRow] =
+    
RowOrdering.createNaturalAscendingOrdering(partitioning.expressions.map(_.dataType))
+
+  override def numPartitions: Int = partitioning.numPartitions
+
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+    // Here we check:
+    //  1. both distributions have the same number of clustering expressions
+    //  2. both partitioning have the same number of partitions
+    //  3. clustering expressions from both sides are compatible in terms of 
number, position
+    //      w.r.t the distribution keys, as well as transform.
+    //  4. the partition values, if present on both sides, are following the 
same order.

Review comment:
       I'm thinking about the theory of compatibility check here. This is very 
complicated and I think we must have some restrictions, to disallow cases like 
partition by `f1(a + b) * f2(c)` or `f1(a + f2(b))`.
   
   Let's say that the partitioning keys must be either a cluster key, or a 
transform function with one or more cluster keys. Two key grouped partitionings 
are compatible iff each of their partition key is compatible, which means:
   1. The partition keys of both sides are cluster keys at the same position
   2. The partition keys of both sides are the same transform function and the 
function inputs point to the cluster keys at the same positions.
   3. The value of each partition at the same index from both sides are the 
same.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to