venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r626224298
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3393,6 +3406,271 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
assert(rprofsE === Set())
}
+ private def initPushBasedShuffleConfs(conf: SparkConf) = {
+ conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
+ conf.set(config.PUSH_BASED_SHUFFLE_ENABLED, true)
+ conf.set("spark.master", "pushbasedshuffleclustermanager")
+ }
+
+ test("SPARK-32920: shuffle merge finalization") {
+ initPushBasedShuffleConfs(conf)
+ DAGSchedulerSuite.clearMergerLocs
+ DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
+ val parts = 2
+ val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(parts))
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker =
mapOutputTracker)
+
+ // Submit a reduce job that depends which will create a map stage
+ submit(reduceRdd, (0 until parts).toArray)
+ completeShuffleMapStageSuccessfully(0, 0, parts)
+ assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId)
== parts)
+ completeNextResultStageWithSuccess(1, 0)
+ assert(results === Map(0 -> 42, 1 -> 42))
+ results.clear()
+ assertDataStructuresEmpty()
+ }
+
+ test("SPARK-32920: merger locations not empty") {
+ initPushBasedShuffleConfs(conf)
+ conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+ DAGSchedulerSuite.clearMergerLocs
+ DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
+ val parts = 2
+
+ val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(parts))
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker =
mapOutputTracker)
+
+ // Submit a reduce job that depends which will create a map stage
+ submit(reduceRdd, (0 until parts).toArray)
+ completeShuffleMapStageSuccessfully(0, 0, parts)
+ val shuffleStage =
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+ assert(shuffleStage.shuffleDep.getMergerLocs.nonEmpty)
+
+ assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId)
== parts)
+ completeNextResultStageWithSuccess(1, 0)
+ assert(results === Map(0 -> 42, 1 -> 42))
+
+ results.clear()
+ assertDataStructuresEmpty()
+ }
+
+ test("SPARK-32920: merger locations reuse from shuffle dependency") {
+ initPushBasedShuffleConfs(conf)
+ conf.set(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS, 3)
+ DAGSchedulerSuite.clearMergerLocs
+ DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
+ val parts = 2
+
+ val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(parts))
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker =
mapOutputTracker)
+ submit(reduceRdd, Array(0, 1))
+
+ completeShuffleMapStageSuccessfully(0, 0, parts)
+ assert(shuffleDep.getMergerLocs.nonEmpty)
+ val mergerLocs = shuffleDep.getMergerLocs
+ completeNextResultStageWithSuccess(1, 0 )
+
+ // submit another job w/ the shared dependency, and have a fetch failure
+ val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
+ submit(reduce2, Array(0, 1))
+ // Note that the stage numbering here is only b/c the shared dependency
produces a new, skipped
+ // stage. If instead it reused the existing stage, then this would be
stage 2
+ completeNextStageWithFetchFailure(3, 0, shuffleDep)
+ scheduler.resubmitFailedStages()
+
+ assert(scheduler.runningStages.nonEmpty)
+ assert(scheduler.stageIdToStage(2)
+ .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs.nonEmpty)
+ val newMergerLocs = scheduler.stageIdToStage(2)
+ .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs
+
+ // Check if same merger locs is reused for the new stage with shared
shuffle dependency
+ assert(mergerLocs.zip(newMergerLocs).forall(x => x._1.host == x._2.host))
+ completeShuffleMapStageSuccessfully(2, 0, 2)
+ completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
+ assert(results === Map(0 -> 1234, 1 -> 1235))
+
+ assertDataStructuresEmpty()
+ }
+
+ test("SPARK-32920: Disable shuffle merge due to not enough mergers
available") {
+ initPushBasedShuffleConfs(conf)
+ conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+ DAGSchedulerSuite.clearMergerLocs
+ DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
+ val parts = 7
+
+ val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(parts))
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker =
mapOutputTracker)
+
+ // Submit a reduce job that depends which will create a map stage
+ submit(reduceRdd, (0 until parts).toArray)
+ completeShuffleMapStageSuccessfully(0, 0, parts)
+ val shuffleStage =
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+ assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)
+
+ completeNextResultStageWithSuccess(1, 0)
+ assert(results === Map(2 -> 42, 5 -> 42, 4 -> 42, 1 -> 42, 3 -> 42, 6 ->
42, 0 -> 42))
+
+ results.clear()
+ assertDataStructuresEmpty()
+ }
+
+ test("SPARK-32920: Ensure child stage should not start before all the" +
+ " parent stages are completed with shuffle merge finalized for all the
parent stages") {
Review comment:
Checked `DAGSchedulerSuite` as well as `HealthTrackerSuite`, those seems
to have 2 indents. Do you see 4 spaces anywhere else?
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3393,6 +3406,271 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
assert(rprofsE === Set())
}
+ private def initPushBasedShuffleConfs(conf: SparkConf) = {
+ conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
+ conf.set(config.PUSH_BASED_SHUFFLE_ENABLED, true)
+ conf.set("spark.master", "pushbasedshuffleclustermanager")
+ }
+
+ test("SPARK-32920: shuffle merge finalization") {
+ initPushBasedShuffleConfs(conf)
+ DAGSchedulerSuite.clearMergerLocs
+ DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
+ val parts = 2
+ val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(parts))
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker =
mapOutputTracker)
+
+ // Submit a reduce job that depends which will create a map stage
+ submit(reduceRdd, (0 until parts).toArray)
+ completeShuffleMapStageSuccessfully(0, 0, parts)
+ assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId)
== parts)
+ completeNextResultStageWithSuccess(1, 0)
+ assert(results === Map(0 -> 42, 1 -> 42))
+ results.clear()
+ assertDataStructuresEmpty()
+ }
+
+ test("SPARK-32920: merger locations not empty") {
+ initPushBasedShuffleConfs(conf)
+ conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+ DAGSchedulerSuite.clearMergerLocs
+ DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
+ val parts = 2
+
+ val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(parts))
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker =
mapOutputTracker)
+
+ // Submit a reduce job that depends which will create a map stage
+ submit(reduceRdd, (0 until parts).toArray)
+ completeShuffleMapStageSuccessfully(0, 0, parts)
+ val shuffleStage =
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+ assert(shuffleStage.shuffleDep.getMergerLocs.nonEmpty)
+
+ assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId)
== parts)
+ completeNextResultStageWithSuccess(1, 0)
+ assert(results === Map(0 -> 42, 1 -> 42))
+
+ results.clear()
+ assertDataStructuresEmpty()
+ }
+
+ test("SPARK-32920: merger locations reuse from shuffle dependency") {
+ initPushBasedShuffleConfs(conf)
+ conf.set(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS, 3)
+ DAGSchedulerSuite.clearMergerLocs
+ DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
+ val parts = 2
+
+ val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(parts))
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker =
mapOutputTracker)
+ submit(reduceRdd, Array(0, 1))
+
+ completeShuffleMapStageSuccessfully(0, 0, parts)
+ assert(shuffleDep.getMergerLocs.nonEmpty)
+ val mergerLocs = shuffleDep.getMergerLocs
+ completeNextResultStageWithSuccess(1, 0 )
+
+ // submit another job w/ the shared dependency, and have a fetch failure
+ val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
+ submit(reduce2, Array(0, 1))
+ // Note that the stage numbering here is only b/c the shared dependency
produces a new, skipped
+ // stage. If instead it reused the existing stage, then this would be
stage 2
+ completeNextStageWithFetchFailure(3, 0, shuffleDep)
+ scheduler.resubmitFailedStages()
+
+ assert(scheduler.runningStages.nonEmpty)
+ assert(scheduler.stageIdToStage(2)
+ .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs.nonEmpty)
+ val newMergerLocs = scheduler.stageIdToStage(2)
+ .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs
+
+ // Check if same merger locs is reused for the new stage with shared
shuffle dependency
+ assert(mergerLocs.zip(newMergerLocs).forall(x => x._1.host == x._2.host))
+ completeShuffleMapStageSuccessfully(2, 0, 2)
+ completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
+ assert(results === Map(0 -> 1234, 1 -> 1235))
+
+ assertDataStructuresEmpty()
+ }
+
+ test("SPARK-32920: Disable shuffle merge due to not enough mergers
available") {
+ initPushBasedShuffleConfs(conf)
+ conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+ DAGSchedulerSuite.clearMergerLocs
+ DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
+ val parts = 7
+
+ val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(parts))
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker =
mapOutputTracker)
+
+ // Submit a reduce job that depends which will create a map stage
+ submit(reduceRdd, (0 until parts).toArray)
+ completeShuffleMapStageSuccessfully(0, 0, parts)
+ val shuffleStage =
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+ assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)
+
+ completeNextResultStageWithSuccess(1, 0)
+ assert(results === Map(2 -> 42, 5 -> 42, 4 -> 42, 1 -> 42, 3 -> 42, 6 ->
42, 0 -> 42))
+
+ results.clear()
+ assertDataStructuresEmpty()
+ }
+
+ test("SPARK-32920: Ensure child stage should not start before all the" +
+ " parent stages are completed with shuffle merge finalized for all the
parent stages") {
+ initPushBasedShuffleConfs(conf)
+ DAGSchedulerSuite.clearMergerLocs
+ DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
+ val parts = 1
+ val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+ val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new
HashPartitioner(parts))
+
+ val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+ val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new
HashPartitioner(parts))
+
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
tracker = mapOutputTracker)
+
+ // Submit a reduce job
+ submit(reduceRdd, (0 until parts).toArray)
+
+ complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
+ val shuffleStage1 =
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+ assert(shuffleStage1.shuffleDep.getMergerLocs.nonEmpty)
+
+ complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
+ val shuffleStage2 =
scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
+ assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty)
+
+ assert(shuffleStage2.isMergeFinalized)
+ assert(shuffleStage1.isMergeFinalized)
+ assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep1.shuffleId)
== parts)
+ assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep2.shuffleId)
== parts)
+
+ completeNextResultStageWithSuccess(2, 0)
+ assert(results === Map(0 -> 42))
+ results.clear()
+ assertDataStructuresEmpty()
+ }
+
+ test("SPARK-32920: Reused ShuffleDependency with Shuffle Merge disabled for
the corresponding" +
+ " ShuffleDependency should not cause DAGScheduler to hang") {
+ initPushBasedShuffleConfs(conf)
+ conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 10)
+ DAGSchedulerSuite.clearMergerLocs
+ DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4",
"host5"))
+ val parts = 20
+
+ val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(parts))
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker =
mapOutputTracker)
+ val partitions = (0 until parts).toArray
+ submit(reduceRdd, partitions)
+
+ completeShuffleMapStageSuccessfully(0, 0, parts)
+ val shuffleStage =
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+ assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)
+
+ completeNextResultStageWithSuccess(1, 0)
+ val reduce2 = new MyRDD(sc, parts, List(shuffleDep))
+ submit(reduce2, partitions)
+ // Stage 2 should not be executed as it should reuse the already computed
shuffle output
+ assert(scheduler.stageIdToStage(2).latestInfo.taskMetrics == null)
+ completeNextResultStageWithSuccess(3, 0, idx => idx + 1234)
+
+ val expected = (0 until parts).map(idx => (idx, idx + 1234))
+ assert(results === expected.toMap)
+
+ assertDataStructuresEmpty()
+ }
+
+ test("SPARK-32920: Reused ShuffleDependency with Shuffle Merge disabled for
the corresponding" +
+ " ShuffleDependency with shuffle data loss should recompute missing
partitions") {
Review comment:
Same as above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]