Github user andrewor14 commented on the pull request:
https://github.com/apache/spark/pull/8511#issuecomment-136094580
@davies I think the changes here fix the problem, but I find it a little
arbitrary that `ZippedPartitionsRDD` need to change. In the future if we decide
to implement SMJ using some other operators then it will silently fail. I
wonder if we should make the changes in SMJ instead, e.g.:
```
// In SortMergeJoin
protected override def doExecute(): RDD[InternalRow] = {
...
// This argument doesn't currently exist. You'll need to add it to
MapPartitionsWithPrepareRDD.
def preparePartition(rdd: RDD[...]): Unit = {
rdd.getNarrowAncestors.collect {
case ancestor: MapPartitionsWithPrepareRDD[...] => ancestor.prepare()
}
}
def executePartition(...): Iterator[...] = {
// Just return the parent iterator (essentially no-op)
}
val zipped = left.execute().zipPartitions(right.execute()) { ... }
new MapPartitionsWithPrepareRDD[...](
zipped, preparePartition, executePartition, preservesPartitioning =
true)
}
```
This essentially forces SMJ to call the `prepare()` method of the ancestor
RDDs before executing. We might need to change `getNarrowAncestors` to maintain
an ordering (right now it's arbitrary). Will something like this work?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]