szehon-ho commented on PR #46255: URL: https://github.com/apache/spark/pull/46255#issuecomment-2080381160
Some implementation notes. SPARK-41471 works by making the ShuffleExchangeExec side of the join have a KeyGroupedPartitioning, which is created by the other side's KeyGroupedShuffleSpec and is a clone of it (with the other side's partition expression and values). That way both sides of the join have KeyGroupedPartioning and SPJ can work. Code changes: - Remove check in KeyGroupedShuffleSpec::canCreatePartitioning that allows only AttributeReference, and add support for TransformExpression - Implement TransformExpression.eval(), by re-using the code from V2ExpressionUtils. This allows the ShuffleExchangeExec to evaluate the partition key with transform expressions from each row. Some fixes: - normalize the valueMap key type in KeyGroupedPartitioner to use specific Seq implementation class. Previously the partitioner's map are initialized with keys as Vector , but then compared with keys as ArraySeq, and these seem to have different hashcodes, so will always create new entries with new partition ids. - add support in V2ExpressionUtil for Scala 'static' invoke() methods for ScalarFunctions (currently only Java static invoke() method is supported). This was needed, for example, in our test scala YearsTransform. - Change the test YearsTransform to have the same logic as the InMemoryBaseTable. This was pointed out in [SPARK-41471](https://github.com/apache/spark/pull/42194) pr. Limitations: - This feature is disabled if partiallyClustered is enabled. Partiallly clustered implies the partitioned side of the join has multiple partitions with the same value, and does not group them. Not sure at the moment, how the shuffle side partitioner on the shuffle side can handle that. - This feature is disabled if allowJoinKeysLessThanPartitionKeys is enabled and partitions are transform expressions. allowJoinKeysLessThanPartitionKeys feature works by 'grouping' the BatchScanExec's partitions again by join keys. If enabled along with this feature, there is a failure happens when checking that both sides of the join (ShuffleExchangeExec and the partitioned BatchScanExec side) have the same number of partitions. This actually works in the first optimizer pass, as ShuffleExchangeExec's KeyGroupedPartioning is created as a clone of the other side (including partition values). But after that there is a 'grouping' phase triggered here: ``` // Now we need to push-down the common partition information to the scan in each child newLeft = populateCommonPartitionInfo(left, mergedPartValues, leftSpec.joinKeyPositions, leftReducers, applyPartialClustering, replicateLeftSide) newRight = populateCommonPartitionInfo(right, mergedPartValues, rightSpec.joinKeyPositions, rightReducers, applyPartialClustering, replicateRightSide) ``` This updates the number of partitions on the BatchScanExec after the grouping by join key. But it does not update the ShuffleExchangeExec number of partitons. Hence the error in subsequent optimizer pass: ``` requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. at scala.Predef$.require(Predef.scala:337) at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49) at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47) at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39) at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66) at scala.collection.immutable.Vector1.map(Vector.scala:2140) at scala.collection.immutable.Vector1.map(Vector.scala:385) at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657) at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632) ``` This can be reproduced by removing this check and running the relevant unit test added in this pr. It needs more investigation to be enabled in follow up pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
