Github user markhamstra commented on the pull request:
https://github.com/apache/spark/pull/12655#issuecomment-215235472
What I'm actually trying to look at right now is something very similar to
https://github.com/apache/spark/pull/8923, but pushing into
newOrUsedShuffleStage the check for prior existence of a Stage in
shuffleToMapStage:
```scala
private def newOrUsedShuffleStage(
shuffleDep: ShuffleDependency[_, _,
_],
firstJobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
shuffleToMapStage.getOrElse(shuffleDep.shuffleId, {
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId,
rdd.creationSite)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
val serLocs =
mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
// Kind of ugly: need to register RDDs with the cache and map
output tracker here
// since we can't do it in the RDD constructor because # of
partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite +
")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId,
rdd.partitions.length)
}
stage
})
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]