cloud-fan commented on a change in pull request #35657:
URL: https://github.com/apache/spark/pull/35657#discussion_r832348410
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -569,6 +629,87 @@ case class HashShuffleSpec(
override def numPartitions: Int = partitioning.numPartitions
}
+case class KeyGroupedShuffleSpec(
+ partitioning: KeyGroupedPartitioning,
+ distribution: ClusteredDistribution) extends ShuffleSpec {
+
+ /**
+ * A sequence where each element is a set of positions of the partition
expression to the cluster
+ * keys. For instance, if cluster keys are [a, b, b] and partition
expressions are
+ * [bucket(4, a), years(b)], the result will be [(0), (1, 2)].
+ */
+ lazy val keyPositions: Seq[mutable.BitSet] = {
+ val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet]
+ distribution.clustering.zipWithIndex.foreach { case (distKey, distKeyPos)
=>
+ distKeyToPos.getOrElseUpdate(distKey.canonicalized,
mutable.BitSet.empty).add(distKeyPos)
+ }
+ partitioning.expressions.map { e =>
+ val leaves = e.collectLeaves()
+ assert(leaves.size == 1, s"Expected exactly one leaf expression from $e,
but found " +
+ s"${leaves.size}")
+ distKeyToPos.getOrElse(leaves.head.canonicalized, mutable.BitSet.empty)
+ }
+ }
+
+ private lazy val ordering: Ordering[InternalRow] =
+
RowOrdering.createNaturalAscendingOrdering(partitioning.expressions.map(_.dataType))
+
+ override def numPartitions: Int = partitioning.numPartitions
+
+ override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+ // Here we check:
+ // 1. both distributions have the same number of clustering expressions
+ // 2. both partitioning have the same number of partitions
+ // 3. clustering expressions from both sides are compatible in terms of
number, position
+ // w.r.t the distribution keys, as well as transform.
+ // 4. the partition values, if present on both sides, are following the
same order.
Review comment:
I'm thinking about the theory of compatibility check here. This is very
complicated and I think we must have some restrictions, to disallow cases like
partition by `f1(a + b) * f2(c)` or `f1(a + f2(b))`.
Let's say that the partitioning keys must be either a cluster key, or a
transform function with one or more cluster keys. Two key grouped partitionings
are compatible iff each of their partition key is compatible, which means:
1. The partition keys of both sides are cluster keys at the same position
2. The partition keys of both sides are the same transform function and the
function inputs point to the cluster keys at the same positions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]