sunchao commented on a change in pull request #35657:
URL: https://github.com/apache/spark/pull/35657#discussion_r839911535
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -574,6 +634,80 @@ case class HashShuffleSpec(
override def numPartitions: Int = partitioning.numPartitions
}
+case class KeyGroupedShuffleSpec(
+ partitioning: KeyGroupedPartitioning,
+ distribution: ClusteredDistribution) extends ShuffleSpec {
+
+ /**
+ * A sequence where each element is a set of positions of the partition
expression to the cluster
+ * keys. For instance, if cluster keys are [a, b, b] and partition
expressions are
+ * [bucket(4, a), years(b)], the result will be [(0), (1, 2)].
+ *
+ * Note that we only allow each partition expression to contain a single
partition key.
+ * Therefore the mapping here is very similar to that from `HashShuffleSpec`.
+ */
+ lazy val keyPositions: Seq[mutable.BitSet] = {
+ val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet]
+ distribution.clustering.zipWithIndex.foreach { case (distKey, distKeyPos)
=>
+ distKeyToPos.getOrElseUpdate(distKey.canonicalized,
mutable.BitSet.empty).add(distKeyPos)
+ }
+ partitioning.expressions.map { e =>
+ val leaves = e.collectLeaves()
+ assert(leaves.size == 1, s"Expected exactly one child from $e, but found
${leaves.size}")
+ distKeyToPos.getOrElse(leaves.head.canonicalized, mutable.BitSet.empty)
+ }
+ }
+
+ private lazy val ordering: Ordering[InternalRow] =
+
RowOrdering.createNaturalAscendingOrdering(partitioning.expressions.map(_.dataType))
+
+ override def numPartitions: Int = partitioning.numPartitions
+
+ override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+ // Here we check:
+ // 1. both distributions have the same number of clustering keys
+ // 2. both partitioning have the same number of partitions
+ // 3. partition expressions from both sides are compatible, which means:
+ // 3.1 both sides have the same number of partition expressions
+ // 3.2 for each pair of partition expressions at the same index, the
corresponding
+ // partition keys must share overlapping positions in their
respective clustering keys.
+ // 3.3 each pair of partition expressions at the same index must share
compatible
+ // transform functions.
+ // 4. the partition values, if present on both sides, are following the
same order.
+ case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning,
otherDistribution) =>
+ val expressions = partitioning.expressions
+ val otherExpressions = otherPartitioning.expressions
+
+ distribution.clustering.length == otherDistribution.clustering.length &&
+ numPartitions == other.numPartitions &&
+ expressions.length == otherExpressions.length && {
+ val otherKeyPositions = otherSpec.keyPositions
+ keyPositions.zip(otherKeyPositions).forall { case (left, right) =>
+ left.intersect(right).nonEmpty
Review comment:
In case of literals, Spark expects the clustering keys also have
literals and thus generates the key position mapping accordingly (in fact
`EnsureRequirementsSuite` only use literals for testing).
I feel removing the bucket parameter in `TransformExpression` will
complicate things a bit and need to be handled as a special case in several
places, since now we have a expression with two children while others only have
one (we only allow a single child in transform expression yet), and also that
the `numBuckets` literal is special and doesn't have any mapping in the key
positions, and thus we'd need to change the logic when checking transform
expression compatibility.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]