sunchao commented on code in PR #39633:
URL: https://github.com/apache/spark/pull/39633#discussion_r1097903665
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##########
@@ -132,12 +140,66 @@ case class BatchScanExec(
outputPartitioning match {
case p: KeyGroupedPartitioning =>
- val partitionMapping = finalPartitions.map(s =>
- s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap
- finalPartitions = p.partitionValues.map { partValue =>
- // Use empty partition for those partition values that are not
present
- partitionMapping.getOrElse(partValue, Seq.empty)
+ // This means the input partitions are not grouped by partition
values. We'll need to
+ // check `groupByPartitionValues` and decide whether to group and
replicate splits within
+ // a partition.
Review Comment:
you're right - moved.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -411,6 +336,235 @@ case class EnsureRequirements(
}
}
+ /**
+ * Checks whether two children, `left` and `right`, of a join operator have
compatible
+ * `KeyGroupedPartitioning`, and can benefit from storage-partitioned join.
+ *
+ * Returns the updated new children if the check is successful, otherwise
`None`.
+ */
+ private def checkKeyGroupCompatible(
+ parent: SparkPlan,
+ left: SparkPlan,
+ right: SparkPlan,
+ requiredChildDistribution: Seq[Distribution]): Option[Seq[SparkPlan]] = {
+ parent match {
+ case smj: SortMergeJoinExec =>
+ checkKeyGroupCompatible(left, right, smj.joinType,
requiredChildDistribution)
+ case sj: ShuffledHashJoinExec =>
+ checkKeyGroupCompatible(left, right, sj.joinType,
requiredChildDistribution)
+ case _ =>
+ None
+ }
+ }
+
+ private def checkKeyGroupCompatible(
+ left: SparkPlan,
+ right: SparkPlan,
+ joinType: JoinType,
+ requiredChildDistribution: Seq[Distribution]): Option[Seq[SparkPlan]] = {
+ assert(requiredChildDistribution.length == 2)
+
+ var newLeft = left
+ var newRight = right
+
+ val specs = Seq(left, right).zip(requiredChildDistribution).map { case (p,
d) =>
+ if (!d.isInstanceOf[ClusteredDistribution]) return None
+ val cd = d.asInstanceOf[ClusteredDistribution]
+ val specOpt = createKeyGroupedShuffleSpec(p.outputPartitioning, cd)
+ if (specOpt.isEmpty) return None
+ specOpt.get
+ }
+
+ val leftSpec = specs.head
+ val rightSpec = specs(1)
+
+ var isCompatible = false
+ if (!conf.v2BucketingPushPartValuesEnabled) {
+ isCompatible = leftSpec.isCompatibleWith(rightSpec)
+ } else {
+ logInfo("Pushing common partition values for storage-partitioned join")
+ isCompatible = leftSpec.areKeysCompatible(rightSpec)
+
+ // Partition expressions are compatible. Regardless of whether partition
values
+ // match from both sides of children, we can we can calculate a superset
of
+ // partition values and push-down to respective data sources so they can
adjust
+ // their output partitioning by filling missing partition keys with empty
+ // partitions. As result, we can still avoid shuffle.
+ //
+ // For instance, if two sides of a join have partition expressions
+ // `day(a)` and `day(b)` respectively
+ // (the join query could be `SELECT ... FROM t1 JOIN t2 on t1.a =
t2.b`), but
+ // with different partition values:
+ // `day(a)`: [0, 1]
+ // `day(b)`: [1, 2, 3]
+ // Following the case 2 above, we don't have to shuffle both sides, but
instead can
+ // just push the common set of partition values: `[0, 1, 2, 3]` down to
the two data
+ // sources.
+ if (isCompatible) {
+ val leftPartValues = leftSpec.partitioning.partitionValues
+ val rightPartValues = rightSpec.partitioning.partitionValues
+
+ logInfo(
+ s"""
+ |Left side # of partitions: ${leftPartValues.size}
+ |Right side # of partitions: ${rightPartValues.size}
+ |""".stripMargin)
+
+ var mergedPartValues = Utils.mergeOrdered(
+ Seq(leftPartValues, rightPartValues))(leftSpec.ordering)
+ .toSeq
+ .distinct
+ .map(v => (v, 1))
+
+ logInfo(s"After merging, there are ${mergedPartValues.size}
partitions")
+
+ var replicateLeftSide = false
+ var replicateRightSide = false
+ var applyPartialClustering = false
+
+ // This means we allow partitions that are not clustered on their
values,
+ // that is, multiple partitions with the same partition value. In the
+ // following, we calculate how many partitions that each distinct
partition
+ // value has, and pushdown the information to scans, so they can
adjust their
+ // final input partitions respectively.
+ if (conf.v2BucketingPartiallyClusteredDistributionEnabled) {
+ logInfo("Calculating partially clustered distribution for " +
+ "storage-partitioned join")
+
+ val canReplicateLeft = canReplicateLeftSide(joinType)
+ val canReplicateRight = canReplicateRightSide(joinType)
Review Comment:
+1. Added comments for this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]