dongjoon-hyun commented on code in PR #38950:
URL: https://github.com/apache/spark/pull/38950#discussion_r1042730764
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -142,12 +145,66 @@ case class EnsureRequirements(
Some(finalCandidateSpecs.values.maxBy(_.numPartitions))
}
+ def getKeyGroupedSpec(idx: Int): ShuffleSpec = specs(idx) match {
+ case ShuffleSpecCollection(specs) => specs.head
+ case spec => spec
+ }
+
+ def populatePartitionKeys(plan: SparkPlan, keys: Seq[InternalRow]):
SparkPlan =
+ plan match {
+ case scan: BatchScanExec =>
+ scan.copy(commonPartitionKeys = Some(keys))
+ case node =>
+ node.mapChildren(child => populatePartitionKeys(child, keys))
+ }
+
// Check if 1) all children are of `KeyGroupedPartitioning` and 2) they
are all compatible
// with each other. If both are true, skip shuffle.
- val allCompatible = childrenIndexes.sliding(2).forall {
- case Seq(a, b) =>
- checkKeyGroupedSpec(specs(a)) && checkKeyGroupedSpec(specs(b)) &&
- specs(a).isCompatibleWith(specs(b))
+ val allCompatible = childrenIndexes.length == 2 && {
+ val left = childrenIndexes.head
+ val right = childrenIndexes(1)
+ var isCompatible: Boolean = false
+
+ if (checkKeyGroupedSpec(specs(left)) &&
checkKeyGroupedSpec(specs(right))) {
+ isCompatible = specs(left).isCompatibleWith(specs(right))
+
+ // If `isCompatible` is false, it could mean:
+ // 1. Partition expressions are not compatible: we have to shuffle
in this case.
+ // 2. Partition expressions are compatible, but partition keys are
not: in this case we
+ // can compute a superset of partition keys and push-down to
respective
+ // data sources, which can then adjust their respective output
partitioning by
+ // filling missing partition keys with empty partitions. As
result, Spark can still
+ // avoid shuffle.
+ if (!isCompatible && conf.v2BucketingPushPartKeysEnabled) {
+ (getKeyGroupedSpec(left), getKeyGroupedSpec(right)) match {
+ case (leftSpec: KeyGroupedShuffleSpec, rightSpec:
KeyGroupedShuffleSpec) =>
+ // Check if the two children are partition expression
compatible. If so, find the
+ // common set of partition keys, and adjust the plan
accordingly.
+ if (leftSpec.isExpressionsCompatible(rightSpec)) {
+ assert(leftSpec.partitioning.partitionValuesOpt.isDefined)
+ assert(rightSpec.partitioning.partitionValuesOpt.isDefined)
+
+ val leftPartKeys =
leftSpec.partitioning.partitionValuesOpt.get
+ val rightPartKeys =
rightSpec.partitioning.partitionValuesOpt.get
+
+ val mergedPartKeys = Utils.mergeOrdered(
+ Seq(leftPartKeys,
rightPartKeys))(leftSpec.ordering).toSeq.distinct
+
+ // Now we need to push-down the common partition key to the
scan in each child
+ children = children.zipWithIndex.map {
+ case (child, idx) if childrenIndexes.contains(idx) =>
+ populatePartitionKeys(child, mergedPartKeys)
+ case (child, _) => child
+ }
+
+ isCompatible = true
+ }
+ case _ => // This case shouldn't happen
Review Comment:
Instead of comments, can we add `assert` simply?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]