Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/8217#discussion_r37128303
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1039,39 +1028,22 @@ class DAGScheduler(
// we registered these map outputs.
mapOutputTracker.registerMapOutputs(
shuffleStage.shuffleDep.shuffleId,
- shuffleStage.outputLocs.map(list => if (list.isEmpty) null
else list.head).toArray,
+ shuffleStage.outputLocs.map(_.headOption.orNull).toArray,
changeEpoch = true)
clearCacheLocs()
+
+ // Some tasks had failed; let's resubmit this shuffleStage
+ // TODO: Lower-level scheduler should also deal with this
if (shuffleStage.outputLocs.contains(Nil)) {
- // Some tasks had failed; let's resubmit this shuffleStage
- // TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + shuffleStage + " (" +
shuffleStage.name +
") because some of its tasks had failed: " +
shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
.map(_._2).mkString(", "))
submitStage(shuffleStage)
- } else {
- val newlyRunnable = new ArrayBuffer[Stage]
- for (shuffleStage <- waitingStages) {
- logInfo("Missing parents for " + shuffleStage + ": " +
- getMissingParentStages(shuffleStage))
- }
- for (shuffleStage <- waitingStages if
getMissingParentStages(shuffleStage).isEmpty)
- {
- newlyRunnable += shuffleStage
- }
- waitingStages --= newlyRunnable
- runningStages ++= newlyRunnable
- for {
- shuffleStage <- newlyRunnable.sortBy(_.id)
- jobId <- activeJobForStage(shuffleStage)
- } {
- logInfo("Submitting " + shuffleStage + " (" +
- shuffleStage.rdd + "), which is now runnable")
- submitMissingTasks(shuffleStage, jobId)
- }
--- End diff --
This might be a potentially controversial change. Let me explain:
This block of code happens when a map stage finishes and all shuffle files
are present. What happens in this block is that the newly runnable stages are
submitted and moved from `waitingStages` to `runningStages`. However, note that
we already do the same thing in `submitWaitingStages` at the end of this
method. So, my understanding is that this block is not actually needed.
*Test coverage.* Note that this only affects the case where the map shuffle
stage has finished successfully, which is already covered in any existing test
in `DAGSchedulerSuite` that runs a successful shuffle.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]