Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/14733
  
    I believe that the test failure is happening because 
`InMemoryTableScanExec`'s `outputPartitioning` says that it has `n` partitions 
yet the produced RDD may yield fewer than `n` partitions:
    
    ```
       // The cached version does not change the outputPartitioning of the 
original SparkPlan.
       override def outputPartitioning: Partitioning = 
relation.child.outputPartitioning
    ```
    
    I think that this is breaking `SortMergeJoinExec` because it's assuming 
that both sides of the join have the same number of partitions:
    
    ```
    left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
          val boundCondition: (InternalRow) => Boolean = {
            condition.map { cond =>
              newPredicate(cond, left.output ++ right.output)
            }.getOrElse {
              (r: InternalRow) => true
            }
          }
    ```
    
    One possible fix would be to have 
`InMemoryTableScanExec.outputPartitioning` copy the parent partitioning and 
just adjust the number of partitions, but doing that will possibly introduce 
performance regressions for certain types of joins joins: for example, in hash 
joins: even though both sides of a hash join may be partitioned according to 
the same hash function and modulus Spark won't realize this because the 
declared number of partitions are different. In other words, you might get 
lucky and wind up joining two co-partitioned cached datasets and then have the 
co-partitioning get de-optimized because the partition pruning alters the 
partitioning.
    
    It might be possible to work around this via a custom, SQL-specific version 
of ZippedPartitionsRDD which understands how to deal with missing partitions 
and zips according to partition ids (dealing with gaps), but this will be 
tricky to get right for outer joins (where you still need to produce output for 
partitions which match to pruned ones).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to