szehon-ho commented on PR #46255:
URL: https://github.com/apache/spark/pull/46255#issuecomment-2080381160

   Some implementation notes.  SPARK-41471 works by making the 
ShuffleExchangeExec side of the join have a KeyGroupedPartitioning, which is 
created by the other side's KeyGroupedShuffleSpec and is a clone of it (with 
the other side's partition expression and values).  That way both sides of the 
join have KeyGroupedPartioning and SPJ can work.
   
   Code changes:
   - Remove check in KeyGroupedShuffleSpec::canCreatePartitioning that allows 
only AttributeReference, and add support for TransformExpression 
   - Implement TransformExpression.eval(), by re-using the code from 
V2ExpressionUtils.  This allows the ShuffleExchangeExec to evaluate the 
partition key with transform expressions from each row.
   
   Some fixes:
   - normalize the valueMap key type in KeyGroupedPartitioner to use specific 
Seq implementation class.  Previously the partitioner's map are initialized 
with keys as Vector , but then compared with keys as ArraySeq, and these seem 
to have different hashcodes, so will always create new entries with new 
partition ids.  
   - add support in V2ExpressionUtil for Scala 'static' invoke() methods for 
ScalarFunctions (currently only Java static invoke() method is supported).  
This was needed, for example, in our test scala YearsTransform.
   - Change the test YearsTransform to have the same logic as the 
InMemoryBaseTable.  This was pointed out in 
[SPARK-41471](https://github.com/apache/spark/pull/42194) pr.
   
   Limitations:
   - This feature is disabled if partiallyClustered is enabled.  Partiallly 
clustered implies the partitioned side of the join has multiple partitions with 
the same value, and does not group them.  Not sure at the moment, how the 
shuffle side partitioner on the shuffle side can handle that.
   - This feature is disabled if allowJoinKeysLessThanPartitionKeys is enabled 
and partitions are transform expressions.  allowJoinKeysLessThanPartitionKeys 
feature works by 'grouping' the BatchScanExec's partitions again by join keys.  
If enabled along with this feature, there is a failure happens when checking 
that both sides of the join (ShuffleExchangeExec and the partitioned 
BatchScanExec side) have the same number of partitions.  This actually works in 
the first optimizer pass, as ShuffleExchangeExec's KeyGroupedPartioning is 
created as a clone of the other side (including partition values).  But after 
that there is a 'grouping' phase triggered here:
   
   ```
           // Now we need to push-down the common partition information to the 
scan in each child
           newLeft = populateCommonPartitionInfo(left, mergedPartValues, 
leftSpec.joinKeyPositions,
             leftReducers, applyPartialClustering, replicateLeftSide)
           newRight = populateCommonPartitionInfo(right, mergedPartValues, 
rightSpec.joinKeyPositions,
             rightReducers, applyPartialClustering, replicateRightSide)
   ```
   This updates the number of partitions on the BatchScanExec after the 
grouping by join key.  But it does not update the ShuffleExchangeExec number of 
partitons.  Hence the error in subsequent optimizer pass:
   ```
   requirement failed: PartitioningCollection requires all of its partitionings 
have the same numPartitions.
   java.lang.IllegalArgumentException: requirement failed: 
PartitioningCollection requires all of its partitionings have the same 
numPartitions.
        at scala.Predef$.require(Predef.scala:337)
        at 
org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550)
        at 
org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49)
        at 
org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47)
        at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39)
        at 
org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66)
        at scala.collection.immutable.Vector1.map(Vector.scala:2140)
        at scala.collection.immutable.Vector1.map(Vector.scala:385)
        at 
org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65)
        at 
org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657)
        at 
org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632)
   ```
   This can be reproduced by removing this check and running the relevant unit 
test added in this pr.  It needs more investigation to be enabled in follow up 
pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to