Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/8511#issuecomment-136094580
  
    @davies I think the changes here fix the problem, but I find it a little 
arbitrary that `ZippedPartitionsRDD` need to change. In the future if we decide 
to implement SMJ using some other operators then it will silently fail. I 
wonder if we should make the changes in SMJ instead, e.g.:
    
    ```
    // In SortMergeJoin
    protected override def doExecute(): RDD[InternalRow] = {
      ...
      // This argument doesn't currently exist. You'll need to add it to 
MapPartitionsWithPrepareRDD.
      def preparePartition(rdd: RDD[...]): Unit = {
        rdd.getNarrowAncestors.collect {
          case ancestor: MapPartitionsWithPrepareRDD[...] => ancestor.prepare()
        }
      }
    
      def executePartition(...): Iterator[...] = {
        // Just return the parent iterator (essentially no-op)
      }
    
      val zipped = left.execute().zipPartitions(right.execute()) { ... }
      new MapPartitionsWithPrepareRDD[...](
        zipped, preparePartition, executePartition, preservesPartitioning = 
true)
    }
    ```
    This essentially forces SMJ to call the `prepare()` method of the ancestor 
RDDs before executing. We might need to change `getNarrowAncestors` to maintain 
an ordering (right now it's arbitrary). Will something like this work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to