sunchao commented on a change in pull request #35657:
URL: https://github.com/apache/spark/pull/35657#discussion_r833512565
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -569,6 +629,87 @@ case class HashShuffleSpec(
override def numPartitions: Int = partitioning.numPartitions
}
+case class KeyGroupedShuffleSpec(
+ partitioning: KeyGroupedPartitioning,
+ distribution: ClusteredDistribution) extends ShuffleSpec {
+
+ /**
+ * A sequence where each element is a set of positions of the partition
expression to the cluster
+ * keys. For instance, if cluster keys are [a, b, b] and partition
expressions are
+ * [bucket(4, a), years(b)], the result will be [(0), (1, 2)].
+ */
+ lazy val keyPositions: Seq[mutable.BitSet] = {
+ val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet]
+ distribution.clustering.zipWithIndex.foreach { case (distKey, distKeyPos)
=>
+ distKeyToPos.getOrElseUpdate(distKey.canonicalized,
mutable.BitSet.empty).add(distKeyPos)
+ }
+ partitioning.expressions.map { e =>
+ val leaves = e.collectLeaves()
+ assert(leaves.size == 1, s"Expected exactly one leaf expression from $e,
but found " +
+ s"${leaves.size}")
+ distKeyToPos.getOrElse(leaves.head.canonicalized, mutable.BitSet.empty)
+ }
+ }
+
+ private lazy val ordering: Ordering[InternalRow] =
+
RowOrdering.createNaturalAscendingOrdering(partitioning.expressions.map(_.dataType))
+
+ override def numPartitions: Int = partitioning.numPartitions
+
+ override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+ // Here we check:
+ // 1. both distributions have the same number of clustering expressions
+ // 2. both partitioning have the same number of partitions
+ // 3. clustering expressions from both sides are compatible in terms of
number, position
+ // w.r.t the distribution keys, as well as transform.
+ // 4. the partition values, if present on both sides, are following the
same order.
Review comment:
Agreed. I can revise this section of comments to reflect more on what
you said. Also realized that I need to add addition checks to make sure the
input of the transform functions are "cluster keys" (e.g., `Attribute`), but
not some other function calls.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]