otterc commented on code in PR #36601:
URL: https://github.com/apache/spark/pull/36601#discussion_r881028865
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1885,6 +1885,16 @@ private[spark] class DAGScheduler(
mapOutputTracker.
unregisterMergeResult(shuffleId, reduceId, bmAddress,
Option(mapIndex))
}
+ } else {
+ // Unregister the merge result of <shuffleId, reduceId> if
+ // there is a FetchFailed event and is not a
+ // MetaDataFetchException which is signified by bmAddress being
null
Review Comment:
Nit: this can fit in 2 lines.
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2449,7 +2459,12 @@ private[spark] class DAGScheduler(
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
logDebug(s"Considering removal of executor $execId; " +
s"fileLost: $fileLost, currentEpoch: $currentEpoch")
- if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId)
< currentEpoch) {
+ // Check if the execId is a shuffle push merger
+ // We do not remove the executor if it is,
+ // and only remove the outputs on the host.
Review Comment:
Nit: This can fit in 2 lines
##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4342,6 +4342,95 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
assertDataStructuresEmpty()
}
+ test("SPARK-38987: corrupted shuffle block FetchFailure should unregister
merge result") {
+ initPushBasedShuffleConfs(conf)
+ DAGSchedulerSuite.clearMergerLocs()
+ DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
+
+ scheduler = new MyDAGScheduler(
+ sc,
+ taskScheduler,
+ sc.listenerBus,
+ mapOutputTracker,
+ blockManagerMaster,
+ sc.env,
+ shuffleMergeFinalize = false,
+ shuffleMergeRegister = false)
+ dagEventProcessLoopTester = new
DAGSchedulerEventProcessLoopTester(scheduler)
+
+ val parts = 2
+ val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(parts))
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker =
mapOutputTracker)
+
+ // Submit a reduce job that depends which will create a map stage
+ submit(reduceRdd, (0 until parts).toArray)
+
+ val shuffleMapStage =
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+ scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+ Seq((0, makeMergeStatus("hostA", shuffleDep.shuffleMergeId))))
+ scheduler.handleShuffleMergeFinalized(shuffleMapStage,
+ shuffleMapStage.shuffleDep.shuffleMergeId)
+ scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+ Seq((1, makeMergeStatus("hostA", shuffleDep.shuffleMergeId))))
+
+ assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId)
== 1)
+
+ // Complete shuffle map stage with FetchFailed on hostA
+ complete(taskSets(0), taskSets(0).tasks.zipWithIndex.map {
+ case (task, _) =>
+ (FetchFailed(
+ makeBlockManagerId("hostA"),
+ shuffleDep.shuffleId, -1L, -1, 0, "corruption fetch failure"), null)
+ }.toSeq)
+ assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId)
== 0)
+ }
+
+ test("SPARK-38987: All shuffle outputs for a shuffle push" +
Review Comment:
Nit could you modify the test name that this should happen when the config
unRegisterOutputOnHost is true
##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4342,6 +4342,95 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
assertDataStructuresEmpty()
}
+ test("SPARK-38987: corrupted shuffle block FetchFailure should unregister
merge result") {
+ initPushBasedShuffleConfs(conf)
+ DAGSchedulerSuite.clearMergerLocs()
+ DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
+
+ scheduler = new MyDAGScheduler(
+ sc,
+ taskScheduler,
+ sc.listenerBus,
+ mapOutputTracker,
+ blockManagerMaster,
+ sc.env,
+ shuffleMergeFinalize = false,
+ shuffleMergeRegister = false)
+ dagEventProcessLoopTester = new
DAGSchedulerEventProcessLoopTester(scheduler)
+
+ val parts = 2
+ val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(parts))
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker =
mapOutputTracker)
+
+ // Submit a reduce job that depends which will create a map stage
+ submit(reduceRdd, (0 until parts).toArray)
+
+ val shuffleMapStage =
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+ scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+ Seq((0, makeMergeStatus("hostA", shuffleDep.shuffleMergeId))))
+ scheduler.handleShuffleMergeFinalized(shuffleMapStage,
+ shuffleMapStage.shuffleDep.shuffleMergeId)
+ scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+ Seq((1, makeMergeStatus("hostA", shuffleDep.shuffleMergeId))))
+
+ assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId)
== 1)
+
+ // Complete shuffle map stage with FetchFailed on hostA
+ complete(taskSets(0), taskSets(0).tasks.zipWithIndex.map {
+ case (task, _) =>
+ (FetchFailed(
+ makeBlockManagerId("hostA"),
+ shuffleDep.shuffleId, -1L, -1, 0, "corruption fetch failure"), null)
+ }.toSeq)
+ assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId)
== 0)
+ }
+
+ test("SPARK-38987: All shuffle outputs for a shuffle push" +
+ " merger executor should be cleaned up on a fetch failure") {
+ conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
+ conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
+
+ val shuffleMapRdd = new MyRDD(sc, 3, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(3))
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker =
mapOutputTracker)
+
+ submit(reduceRdd, Array(0, 1, 2))
+ // Map stage completes successfully,
+ // two tasks are run on an executor on hostA and one on an executor on
hostB
+ completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostA",
"hostB"))
+ // Now the executor on hostA is lost
+ runEvent(ExecutorLost(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER,
+ ExecutorExited(-100, false, "Container marked as failed")))
+
+ // Shuffle push merger executor should not be removed and the shuffle
files are not unregistered
+ verify(blockManagerMaster,
times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+ verify(mapOutputTracker,
+
times(0)).removeOutputsOnExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+
+ // Now a fetch failure from the lost executor occurs
+ complete(taskSets(1), Seq(
+ (FetchFailed(BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER,
"hostA", 12345),
+ shuffleId, 0L, 0, 0, "ignored"), null)
+ ))
+
+ // Verify that we are not removing the executor,
+ // and that we are only removing the outputs on the host
+ verify(blockManagerMaster,
times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+ verify(mapOutputTracker,
+ times(1)).removeOutputsOnHost("hostA")
+
+ // Shuffle files for shuffle-push-merger executor should be lost
Review Comment:
What do you mean by this? There isn't any executor `shuffle-push-merger`. We
should just assert here that all the mapStatus and mergeStatus on the host are
removed
##########
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala:
##########
@@ -1786,4 +1786,32 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2)))
}
+ test("SPARK-38987: failure to fetch corrupted shuffle block chunk should " +
+ "throw a FetchFailedException when corruption detection is turned off") {
Review Comment:
Nit: change this to `when early detection is unable to catch corruption`
because it may be possible that the block is corrupt but the first
maxBytesInFlight/3 are not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]