szehon-ho commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1304977574
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##########
@@ -144,8 +159,25 @@ case class BatchScanExec(
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
"is enabled")
- val groupedPartitions =
groupPartitions(finalPartitions.map(_.head),
- groupSplits = true).get
+ // In the case where we replicate partitions, we have grouped
+ // the partitions by the join key if they differ
+ val groupByExpressions =
Review Comment:
Done, changed outputPartitioning to return KeyGroupedPartitoning to reflect
that.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -701,41 +705,78 @@ case class KeyGroupedShuffleSpec(
case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning,
otherDistribution) =>
distribution.clustering.length == otherDistribution.clustering.length &&
numPartitions == other.numPartitions && areKeysCompatible(otherSpec) &&
-
partitioning.partitionValues.zip(otherPartitioning.partitionValues).forall {
- case (left, right) =>
- InternalRowComparableWrapper(left, partitioning.expressions)
- .equals(InternalRowComparableWrapper(right,
partitioning.expressions))
- }
+ isPartitioningCompatible(otherPartitioning)
case ShuffleSpecCollection(specs) =>
specs.exists(isCompatibleWith)
case _ => false
}
+ def isPartitioningCompatible(otherPartitioning: KeyGroupedPartitioning):
Boolean = {
+ val clusterKeySize = keyPositions.size
+ partitioning.partitionValues.zip(otherPartitioning.partitionValues)
+ .forall {
+ case (left, right) =>
+ val leftTypes = partitioning.expressions.map(_.dataType)
+ val leftVals = left.toSeq(leftTypes).take(clusterKeySize).toArray
+ val newLeft = new GenericInternalRow(leftVals)
+
+ val rightTypes = partitioning.expressions.map(_.dataType)
+ val rightVals = right.toSeq(rightTypes).take(clusterKeySize).toArray
+ val newRight = new GenericInternalRow(rightVals)
+
+ InternalRowComparableWrapper(newLeft,
partitioning.expressions.take(clusterKeySize))
+ .equals(InternalRowComparableWrapper(
+ newRight, partitioning.expressions.take(clusterKeySize)))
+ }
+ }
+
// Whether the partition keys (i.e., partition expressions) are compatible
between this and the
// `other` spec.
def areKeysCompatible(other: KeyGroupedShuffleSpec): Boolean = {
- val expressions = partitioning.expressions
- val otherExpressions = other.partitioning.expressions
-
- expressions.length == otherExpressions.length && {
- val otherKeyPositions = other.keyPositions
- keyPositions.zip(otherKeyPositions).forall { case (left, right) =>
- left.intersect(right).nonEmpty
+ partitionExpressionsCompatible(other) &&
+ KeyGroupedShuffleSpec.keyPositionsCompatible(
+ keyPositions, other.keyPositions
+ )
+ }
+
+ // Whether the partition keys (i.e., partition expressions) that also are in
the set of
+ // cluster keys are compatible between this and the 'other' spec.
Review Comment:
I think its moved from existing javadoc, but removed it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]