[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r799920430 ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -3671,7 +3671,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti completeShuffleMapStageSuccessfully(0, 0, parts) val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] -assert(!shuffleStage.shuffleDep.shuffleMergeEnabled) +assert(shuffleStage.shuffleDep.mergerLocs.isEmpty) Review comment: You are right, my bad. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r797940716 ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -1821,7 +1834,7 @@ private[spark] class DAGScheduler( } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { - if (!shuffleStage.shuffleDep.shuffleMergeFinalized && + if (!shuffleStage.shuffleDep.isShuffleMergeFinalizedMarked && shuffleStage.shuffleDep.getMergerLocs.nonEmpty) { checkAndScheduleShuffleMergeFinalize(shuffleStage) } else { Review comment: Sounds good, we can add it to `handleShuffleMergeFinalized` as well: that does look like a good place to introduce it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r797940406 ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -2313,7 +2326,7 @@ private[spark] class DAGScheduler( // Register merge statuses if the stage is still running and shuffle merge is not finalized yet. // TODO: SPARK-35549: Currently merge statuses results which come after shuffle merge // TODO: is finalized is not registered. -if (runningStages.contains(stage) && !stage.shuffleDep.shuffleMergeFinalized) { +if (runningStages.contains(stage) && !stage.shuffleDep.isShuffleMergeFinalizedMarked) { Review comment: Agree with your analysis on both @venkata91 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r797717323 ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -3671,7 +3671,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti completeShuffleMapStageSuccessfully(0, 0, parts) val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] -assert(!shuffleStage.shuffleDep.shuffleMergeEnabled) +assert(shuffleStage.shuffleDep.mergerLocs.isEmpty) Review comment: Looks like this comment was missed ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r796929787 ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -1821,7 +1834,7 @@ private[spark] class DAGScheduler( } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { - if (!shuffleStage.shuffleDep.shuffleMergeFinalized && + if (!shuffleStage.shuffleDep.isShuffleMergeFinalizedMarked && shuffleStage.shuffleDep.getMergerLocs.nonEmpty) { checkAndScheduleShuffleMergeFinalize(shuffleStage) } else { Review comment: In `processShuffleMapStageCompletion`, add something like ``` if (!shuffleStage.isIndeterminate && shuffleStage.shuffleDep.shuffleMergeEnabled) { shuffleStage.shuffleDep.setShuffleMergeAllowed(false) } ``` to ensure we dont retry merge for determinate stages ? This is strictly not related to this PR, so I am fine with doing it in a follow up PR as well to keep the scope contained (we can file a jira in that case). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r796929787 ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -1821,7 +1834,7 @@ private[spark] class DAGScheduler( } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { - if (!shuffleStage.shuffleDep.shuffleMergeFinalized && + if (!shuffleStage.shuffleDep.isShuffleMergeFinalizedMarked && shuffleStage.shuffleDep.getMergerLocs.nonEmpty) { checkAndScheduleShuffleMergeFinalize(shuffleStage) } else { Review comment: In `processShuffleMapStageCompletion`, add something like ``` if (!shuffleStage.isIndeterminate && shuffleStage.shuffleDep.shuffleMergeEnabled) { shuffleStage.shuffleDep.setShuffleMergeAllowed(false) } ``` to ensure we dont retry merge for determinate stages ? This is strictly not related to this PR, so I am fine with doing it in a follow up PR as well to keep the scope contained (we can a jira in that case). ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -3671,7 +3671,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti completeShuffleMapStageSuccessfully(0, 0, parts) val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] -assert(!shuffleStage.shuffleDep.shuffleMergeEnabled) +assert(shuffleStage.shuffleDep.mergerLocs.isEmpty) Review comment: Looks like this was missed ? ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -4147,7 +4146,206 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults) } -/** + test("SPARK-34826: Adaptively fetch shuffle mergers") { +initPushBasedShuffleConfs(conf) +conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2) +DAGSchedulerSuite.clearMergerLocs() +DAGSchedulerSuite.addMergerLocs(Seq("host1")) +val parts = 2 + +val shuffleMapRdd = new MyRDD(sc, parts, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) +val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + +// Submit a reduce job that depends which will create a map stage +submit(reduceRdd, (0 until parts).toArray) + +runEvent(makeCompletionEvent( + taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts), + Seq.empty, Array.empty, createFakeTaskInfoWithId(0))) + +val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] +assert(!shuffleStage1.shuffleDep.shuffleMergeEnabled) +assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty) + +DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3")) + +// host2 executor added event to trigger registering of shuffle merger locations +// as shuffle mergers are tracked separately for test +runEvent(ExecutorAdded("exec2", "host2")) + +// Check if new shuffle merger locations are available for push or not +assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2) +assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2) + +// Complete remaining tasks in ShuffleMapStage 0 +runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success, + makeMapStatus("host1", parts), Seq.empty, Array.empty, createFakeTaskInfoWithId(1))) + +completeNextResultStageWithSuccess(1, 0) +assert(results === Map(0 -> 42, 1 -> 42)) + +results.clear() +assertDataStructuresEmpty() + } + + test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") { +initPushBasedShuffleConfs(conf) +conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2) +DAGSchedulerSuite.clearMergerLocs() +DAGSchedulerSuite.addMergerLocs(Seq("host1")) +val parts = 2 + +val shuffleMapRdd1 = new MyRDD(sc, parts, Nil) +val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts)) +val shuffleMapRdd2 = new MyRDD(sc, parts, Nil) +val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts)) +val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2), + tracker = mapOutputTracker) Review comment: Let us keep them separate, and do it with the other jira if we decide to take that path. ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -2313,7 +2326,7 @@ private[spark] class DAGScheduler( // Register merge statuses if the stage is still running and shuffle merge is not finalized yet. // TODO: SPARK-35549: Currently merge statuses results which come
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r796060753 ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -3613,8 +3613,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage] assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty) -assert(shuffleStage2.shuffleDep.shuffleMergeFinalized) -assert(shuffleStage1.shuffleDep.shuffleMergeFinalized) +assert(shuffleStage2.shuffleDep.isShuffleMergeFinalizedMarked) +assert(shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked) Review comment: I am slightly on the fence for some of these test reverts (`shuffleMergeFinalized` -> `isShuffleMergeFinalizedMarked`), since we are doing a stronger check now. So I am fine with leaving them as they are in this PR as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r796060753 ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -3613,8 +3613,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage] assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty) -assert(shuffleStage2.shuffleDep.shuffleMergeFinalized) -assert(shuffleStage1.shuffleDep.shuffleMergeFinalized) +assert(shuffleStage2.shuffleDep.isShuffleMergeFinalizedMarked) +assert(shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked) Review comment: I am slightly on the fence for some of these test reverts (`shuffleMergeFinalized` -> `isShuffleMergeFinalizedMarked`), since we are doing a stronger check now. So I am fine with leaving them as-is as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r796060753 ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -3613,8 +3613,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage] assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty) -assert(shuffleStage2.shuffleDep.shuffleMergeFinalized) -assert(shuffleStage1.shuffleDep.shuffleMergeFinalized) +assert(shuffleStage2.shuffleDep.isShuffleMergeFinalizedMarked) +assert(shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked) Review comment: I am slightly on the fence for some of these test reverts, since we are doing a stronger check now. So I am fine with leaving them as-is as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r796058549 ## File path: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ## @@ -131,7 +131,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]] val (blocksByAddress, canEnableBatchFetch) = - if (baseShuffleHandle.dependency.shuffleMergeEnabled) { + if (baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked) { Review comment: Makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r794998416 ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -3686,14 +3686,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti completeNextStageWithFetchFailure(3, 0, shuffleDep) scheduler.resubmitFailedStages() -// Make sure shuffle merge is disabled for the retry val stage2 = scheduler.stageIdToStage(2).asInstanceOf[ShuffleMapStage] -assert(!stage2.shuffleDep.shuffleMergeEnabled) +assert(stage2.shuffleDep.shuffleMergeEnabled) Review comment: I was thinking more about this ... should we always mark DETERMINATE stages with `setShuffleMergeAllowed(false)` when they complete ? It makes things more cleaner, and in line with expectation whether mergers were used for first execution or not. I dont feel very strongly about it though ... Thoughts ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r794997274 ## File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala ## @@ -855,7 +855,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { rpcEnv.shutdown() } - test("SPARK-37023: Avoid fetching merge status when shuffleMergeEnabled is false") { + test("SPARK-37023: Avoid fetching merge status when isShuffleMergeFinalizedMarked is false") { Review comment: nit: isShuffleMergeFinalizedMarked -> useMergeResult ? That is what we are testing here actually (this test went through some changes as code evolved, but looks like test name was missed). ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -4051,8 +4050,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti runEvent(StageCancelled(0, Option("Explicit cancel check"))) scheduler.handleShuffleMergeFinalized(shuffleStage1, shuffleStage1.shuffleDep.shuffleMergeId) -assert(shuffleStage1.shuffleDep.shuffleMergeEnabled) -assert(!shuffleStage1.shuffleDep.shuffleMergeFinalized) +assert(shuffleStage1.shuffleDep.mergerLocs.nonEmpty) +assert(!shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked) Review comment: nit: revert ? ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -4147,7 +4146,206 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults) } -/** + test("SPARK-34826: Adaptively fetch shuffle mergers") { +initPushBasedShuffleConfs(conf) +conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2) +DAGSchedulerSuite.clearMergerLocs() +DAGSchedulerSuite.addMergerLocs(Seq("host1")) +val parts = 2 + +val shuffleMapRdd = new MyRDD(sc, parts, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) +val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + +// Submit a reduce job that depends which will create a map stage +submit(reduceRdd, (0 until parts).toArray) + +runEvent(makeCompletionEvent( + taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts), + Seq.empty, Array.empty, createFakeTaskInfoWithId(0))) + +val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] +assert(!shuffleStage1.shuffleDep.shuffleMergeEnabled) +assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty) + +DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3")) + +// host2 executor added event to trigger registering of shuffle merger locations +// as shuffle mergers are tracked separately for test +runEvent(ExecutorAdded("exec2", "host2")) + +// Check if new shuffle merger locations are available for push or not +assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2) +assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2) + +// Complete remaining tasks in ShuffleMapStage 0 +runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success, + makeMapStatus("host1", parts), Seq.empty, Array.empty, createFakeTaskInfoWithId(1))) + +completeNextResultStageWithSuccess(1, 0) +assert(results === Map(0 -> 42, 1 -> 42)) + +results.clear() +assertDataStructuresEmpty() + } + + test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") { +initPushBasedShuffleConfs(conf) +conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2) +DAGSchedulerSuite.clearMergerLocs() +DAGSchedulerSuite.addMergerLocs(Seq("host1")) +val parts = 2 + +val shuffleMapRdd1 = new MyRDD(sc, parts, Nil) +val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts)) +val shuffleMapRdd2 = new MyRDD(sc, parts, Nil) +val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts)) +val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2), + tracker = mapOutputTracker) Review comment: Based on discussion [above](https://github.com/apache/spark/pull/34122/files#r790376895), we might need to end up merging this test with the next ("SPARK-34826: Adaptively fetch shuffle mergers with stage retry for indeterminate stage"). ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -1281,6 +1341,33 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } } + override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = { +shufflePushMergerLocations.getOrElse(shuffleId, getMergerLocations(shuffleId)) + } + + private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] =
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r794718066 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -145,20 +148,26 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( } /** - * Returns true if push-based shuffle is disabled for this stage or empty RDD, - * or if the shuffle merge for this stage is finalized, i.e. the shuffle merge - * results for all partitions are available. + * Returns true if the RDD is an empty RDD or if the shuffle merge for this shuffle is + * finalized. */ def shuffleMergeFinalized: Boolean = { -// Empty RDD won't be computed therefore shuffle merge finalized should be true by default. -if (shuffleMergeEnabled && numPartitions > 0) { - _shuffleMergedFinalized +_shuffleMergedFinalized + } Review comment: Thinking more, we are changing semantics of `shuffleMergeFinalized` here, which was publically exposed. What used to be `shuffleMergeFinalized` is now becoming `isShuffleMergeFinalizedIfEnabled` - so the behavior is exposed by another method. We should preserve the earlier semantics for the method ... For this PR, that would mean: 1) Rename `shuffleMergeFinalized` as `isShuffleMergeFinalized` (a private[spark] method). 2) Rename `isShuffleMergeFinalizedIfEnabled` to `shuffleMergeFinalized` - which will preserve the earlier behavior as well. Thoughts ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r794700609 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -160,14 +160,14 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( * this shuffle is finalized. */ def isShuffleMergeFinalizedIfEnabled: Boolean = { -if (mergerLocs.nonEmpty) { +if (shuffleMergeEnabled) { shuffleMergeFinalized } else { true } } - def newShuffleMergeState(): Unit = { + private[spark] def newShuffleMergeState(): Unit = { Review comment: This was in 3.2, let us move this change to a separate jira -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r794698873 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed - // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed - private[this] var _shuffleMergeEnabled = shuffleMergeAllowed - - private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { -_shuffleMergeEnabled = shuffleMergeEnabled - } - - def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled Review comment: @venkata91 Let us fix (methods/variables which should be `private[spark]` but ended up getting exposed) the changes from master, which are not in 3.2/older - and open a jira for the others as a follow up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r794698873 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed - // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed - private[this] var _shuffleMergeEnabled = shuffleMergeAllowed - - private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { -_shuffleMergeEnabled = shuffleMergeEnabled - } - - def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled Review comment: @venkata91 Let us fix the changes from master, which are not in 3.2/older - and open a jira for the others as a follow up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r794072767 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed - // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed - private[this] var _shuffleMergeEnabled = shuffleMergeAllowed - - private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { -_shuffleMergeEnabled = shuffleMergeEnabled - } - - def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled Review comment: +CC @tgravescs, @Ngone51 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r794072564 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed - // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed - private[this] var _shuffleMergeEnabled = shuffleMergeAllowed - - private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { -_shuffleMergeEnabled = shuffleMergeEnabled - } - - def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled Review comment: Good call on `incPushCompleted`. Both `setMergerLocs` and `newShuffleMergeState` are in released versions, which makes it more difficult to make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r793836705 ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -2608,8 +2608,12 @@ private[spark] class DAGScheduler( // isPushBasedShuffleEnabled and shuffleMergers need to be updated at the end. stage match { case s: ShuffleMapStage => - stage.latestInfo.setPushBasedShuffleEnabled(s.shuffleDep.getMergerLocs.nonEmpty) -stage.latestInfo.setShuffleMergerCount(s.shuffleDep.getMergerLocs.size) +val isPushShuffleEnabled = + s.shuffleDep.shuffleMergeAllowed && s.shuffleDep.getMergerLocs.nonEmpty Review comment: When will we have the case where mergerLocs is non empty, but merge is not allowed ? Is this for retry of DETERMINATE stage ? If yes, do we want to change definition of `def shuffleMergeEnabled` to be `shuffleMergeAllowed && mergerLocs.nonEmpty` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r793837336 ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -3686,14 +3686,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti completeNextStageWithFetchFailure(3, 0, shuffleDep) scheduler.resubmitFailedStages() -// Make sure shuffle merge is disabled for the retry val stage2 = scheduler.stageIdToStage(2).asInstanceOf[ShuffleMapStage] -assert(!stage2.shuffleDep.shuffleMergeEnabled) +assert(stage2.shuffleDep.shuffleMergeEnabled) Review comment: Thanks for clarifying. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r793836705 ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -2608,8 +2608,12 @@ private[spark] class DAGScheduler( // isPushBasedShuffleEnabled and shuffleMergers need to be updated at the end. stage match { case s: ShuffleMapStage => - stage.latestInfo.setPushBasedShuffleEnabled(s.shuffleDep.getMergerLocs.nonEmpty) -stage.latestInfo.setShuffleMergerCount(s.shuffleDep.getMergerLocs.size) +val isPushShuffleEnabled = + s.shuffleDep.shuffleMergeAllowed && s.shuffleDep.getMergerLocs.nonEmpty Review comment: When will we have the case where mergerLocs is non empty, but merge is not allowed ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r793834972 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -202,7 +204,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( // Only used by DAGScheduler to coordinate shuffle merge finalization @transient private[this] var finalizeTask: Option[ScheduledFuture[_]] = None - def getFinalizeTask: Option[ScheduledFuture[_]] = finalizeTask + private[spark] def getFinalizeTask: Option[ScheduledFuture[_]] = finalizeTask def setFinalizeTask(task: ScheduledFuture[_]): Unit = { Review comment: Make the setter `private[spark]` as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r793834393 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed - // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed - private[this] var _shuffleMergeEnabled = shuffleMergeAllowed - - private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { -_shuffleMergeEnabled = shuffleMergeEnabled - } - - def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled Review comment: > In that case, should we also make shuffleMergeAllowed as well private[spark]. For now it is better to make everything private[spark] related to push-based shuffle right? Thoughts? `shuffleMergeAllowed` is similar to `shuffleMergeEnabled` - and has value as a developer api, which developers can query for - the setters dont. FinalizeTask on other hand is a purely internal impl detail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r793833288 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed - // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed - private[this] var _shuffleMergeEnabled = shuffleMergeAllowed - - private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { -_shuffleMergeEnabled = shuffleMergeEnabled - } - - def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled Review comment: > We are using mergerLocs.isEmpty in other places as well. If we use shuffleMergeEnabled (mergerLocs.nonEmpty) and mergerLocs.isEmpty|nonEmpty in few other places, wouldn't it be confusing? Yes, we should not blindly replace - only when the intent is to check if shuffle merge is enabled; though I would expect a empty/nonEmpty check against mergerLocs to pretty much be a proxy for is shuffle merge enabled. For example, the changes in the tests - the earlier intent was to check if merge was enabled; which got replaced with checks against mergerLocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r792766502 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed - // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed - private[this] var _shuffleMergeEnabled = shuffleMergeAllowed - - private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { -_shuffleMergeEnabled = shuffleMergeEnabled - } - - def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled Review comment: Let us keep this method - and make it `def shuffleMergeEnabled = mergerLocs.nonEmpty` What I meant was, let us remove the boolean `_shuffleMergeEnabled` So that rest of the code can depend on this as a state and not need to inspect the mergerLocs directly Also, this is part of public api - so cant remove it :-) Speaking of which, can you make `getFinalizeTask`/`getFinalizeTask` `private[spark]` as well ? (`ShuffleDependency` is developer api and we cant leak impl details) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r792766502 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed - // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed - private[this] var _shuffleMergeEnabled = shuffleMergeAllowed - - private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { -_shuffleMergeEnabled = shuffleMergeEnabled - } - - def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled Review comment: Let us keep this method - and make it `def shuffleMergeEnabled = mergerLocs.nonEmpty` What I meant was, let us remove the boolean `_shuffleMergeEnabled` So that rest of the code can depend on this as a state and not need to inspect the mergerLocs directly Also, this is part of public api - so cant remove it :-) Speaking of which, can you make `getFinalizeTask`/`getFinalizeTask` `private[spark]` as well ? (this or different pr - `ShuffleDependency` is developer api and we cant leak impl details) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r792766502 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed - // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed - private[this] var _shuffleMergeEnabled = shuffleMergeAllowed - - private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { -_shuffleMergeEnabled = shuffleMergeEnabled - } - - def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled Review comment: Let us keep this method - and make it `def shuffleMergeEnabled = mergerLocs.nonEmpty` What I meant was, let us remove the boolean `_shuffleMergeEnabled` So that rest of the code can depend on this as a state and not need to inspect the mergerLocs directly Also, this is part of public api - so cant remove it :-) Speaking of which, can you make `getFinalizeTask` `private[spark]` as well ? (this or different pr - `ShuffleDependency` is developer api and we cant leak impl details) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r792766502 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed - // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed - private[this] var _shuffleMergeEnabled = shuffleMergeAllowed - - private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { -_shuffleMergeEnabled = shuffleMergeEnabled - } - - def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled Review comment: Let us keep this method - and make it `def shuffleMergeEnabled = mergerLocs.nonEmpty` What I meant was, let us remove the boolean `_shuffleMergeEnabled` So that rest of the code can depend on this as a state and not need to inspect the mergerLocs directly Also, this is part of public api - so cant remove it :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r792766502 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed - // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed - private[this] var _shuffleMergeEnabled = shuffleMergeAllowed - - private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { -_shuffleMergeEnabled = shuffleMergeEnabled - } - - def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled Review comment: Let us keep this method - and make it `def shuffleMergeEnabled = mergerLocs.nonEmpty` What I meant was, let us remove the boolean `_shuffleMergeEnabled` So that rest of the code can depend on this as a state and not need to inspect the mergerLocs directly ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -3579,7 +3579,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti submit(reduceRdd, (0 until parts).toArray) completeShuffleMapStageSuccessfully(0, 0, parts) Review comment: Once we reintroduce the `shuffleMergeEnabled` method, all changes to this file (DAGSchedulerSuite) related to mergerLocs.isEmpty == !shuffleMergeEnabled (and reverse) can be reverted (for the latest [diff](https://github.com/apache/spark/pull/34122/commits/3b52bed7ea4bbdb127a4711201d8c8268ab06650#)) ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -4172,7 +4172,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // host2 executor added event to trigger registering of shuffle merger locations // as shuffle mergers are tracked separately for test -runEvent(ExecutorAdded("host2", "host2")) +runEvent(ExecutorAdded("host2", "exec2")) Review comment: Order is reverse - `ExecutorAdded(execId: String, host: String)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r792761973 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -135,6 +144,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeId: Int = _shuffleMergeId def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = { +assert(shuffleMergeEnabled || shuffleMergeAllowed) Review comment: Yes, when we want it to be disabled (like with retry of DETERMINATE stage which was already finalized), we must set merge allowed to false. It is cleaner. Note - `def isShuffleMergeEnabled` or `def shuffleMergeEnabled` pointing to `mergerLocs.nonEmpty` is fine (and preferred to using mergerLocs.) - we just dont need the state boolean. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r792761973 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -135,6 +144,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeId: Int = _shuffleMergeId def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = { +assert(shuffleMergeEnabled || shuffleMergeAllowed) Review comment: Yes, when we want it to be disabled (like with retry of DETERMINATE stage which was already finalized), we must set merge allowed to false. It is cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r790361031 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -135,6 +144,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def shuffleMergeId: Int = _shuffleMergeId def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = { +assert(shuffleMergeEnabled || shuffleMergeAllowed) Review comment: `shuffleMergeAllowed` is a superset - we dont need `shuffleMergeEnabled` here (and actually, is getting set after `setMergerLocs` in this PR). Also, see comment [above](https://github.com/apache/spark/pull/34122/files#r785389352) on whether we need the variable (`_shuffleMergeEnabled `) at all. ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -3686,14 +3686,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti completeNextStageWithFetchFailure(3, 0, shuffleDep) scheduler.resubmitFailedStages() -// Make sure shuffle merge is disabled for the retry val stage2 = scheduler.stageIdToStage(2).asInstanceOf[ShuffleMapStage] -assert(!stage2.shuffleDep.shuffleMergeEnabled) +assert(stage2.shuffleDep.shuffleMergeEnabled) Review comment: Why did this behavior change ? I would expect it to be the same before/after this PR. For DETERMINATE stage, we should not have merge enabled for the retry if the previous attempt did finalize. ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -4147,7 +4146,210 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults) } -/** + test("SPARK-34826: Adaptively fetch shuffle mergers") { +initPushBasedShuffleConfs(conf) +conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2) +DAGSchedulerSuite.clearMergerLocs() +DAGSchedulerSuite.addMergerLocs(Seq("host1")) +val parts = 2 + +val shuffleMapRdd = new MyRDD(sc, parts, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) +val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + +// Submit a reduce job that depends which will create a map stage +submit(reduceRdd, (0 until parts).toArray) + +runEvent(makeCompletionEvent( + taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts), + Seq.empty, Array.empty, createFakeTaskInfoWithId(0))) + +val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] +assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty) +assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty) + +DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3")) + +// host2 executor added event to trigger registering of shuffle merger locations +// as shuffle mergers are tracked separately for test +runEvent(ExecutorAdded("host2", "host2")) Review comment: nit: Use 'exec' instead of 'host' as prefix for executor id ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -1176,6 +1223,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr // instantiate a serializer. See the followup to SPARK-36705 for more details. private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, isDriver = false) + // Exposed for testing + val shufflePushMergerLocations = new ConcurrentHashMap[Int, Seq[BlockManagerId]]().asScala Review comment: Sounds good, this looks fine. ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -1369,24 +1370,34 @@ private[spark] class DAGScheduler( * locations for block push/merge by getting the historical locations of past executors. */ private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { -assert(stage.shuffleDep.shuffleMergeEnabled && !stage.shuffleDep.shuffleMergeFinalized) +assert(stage.shuffleDep.shuffleMergeAllowed && !stage.shuffleDep.shuffleMergeFinalized) if (stage.shuffleDep.getMergerLocs.isEmpty) { - val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( -stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) - if (mergerLocs.nonEmpty) { -stage.shuffleDep.setMergerLocs(mergerLocs) -logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" + - s" ${stage.shuffleDep.getMergerLocs.size} merger locations") - -logDebug("List of shuffle push merger locations " + - s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") - } else { -stage.shuffleDep.setShuffleMergeEnabled(false) -
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r785388090 ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -1176,6 +1223,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr // instantiate a serializer. See the followup to SPARK-36705 for more details. private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, isDriver = false) + // Exposed for testing + val shufflePushMergerLocations = new ConcurrentHashMap[Int, Seq[BlockManagerId]]().asScala Review comment: This needs to account for shuffle merge id as well - to ensure the merger locations are consistent. Possible flow is: a) attempt 0 has enough mergers, and so executors cache value for attempt 0 mergers. b) one or more hosts gets added to exclude list before attempt 1). c) attempt 1 starts, with merge allowed = true, but merge enabled = false - so tasks start without mergers populated in dependency (merge might get enabled as more executors are added). d) executors which ran attempt 0 will use mergers from previous attempt - new executors will use mergers from attempt 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r785387856 ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -1281,6 +1331,22 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } } + override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = { +shufflePushMergerLocations.getOrElse(shuffleId, getMergerLocations(shuffleId)) + } + + private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = { +fetchingLock.withLock(shuffleId) { + val mergers = askTracker[Seq[BlockManagerId]](GetShufflePushMergerLocations(shuffleId)) Review comment: Check if `shufflePushMergerLocations(shuffleId)` is nonEmpty before fetching it again - in case of concurrent updates to before we acquire the lock for shuffleId (See `getStatuses` or other uses of `fetchingLock` for example). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r785389352 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -106,7 +106,17 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( // By default, shuffle merge is enabled for ShuffleDependency if push based shuffle // is enabled - private[this] var _shuffleMergeEnabled = canShuffleMergeBeEnabled() + private[this] var _shuffleMergeAllowed = canShuffleMergeBeEnabled() + + private[spark] def setShuffleMergeAllowed(shuffleMergeAllowed: Boolean): Unit = { +_shuffleMergeAllowed = shuffleMergeAllowed + } + + def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed + + // By default, shuffle merge is enabled for ShuffleDependency if push based shuffle + // is enabled + private[this] var _shuffleMergeEnabled = shuffleMergeAllowed private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { _shuffleMergeEnabled = shuffleMergeEnabled Review comment: With all the changes we have made, do we need this boolean ? Or will `mergerLocs.nonEmpty` not do ? (`def shuffleMergeEnabled : Boolean = mergerLocs.nonEmpty`) Also, in `setShuffleMergeEnabled ` (if we are keeping it) or in `setMergerLocs`, add`assert (! shuffleMergeEnabled || shuffleMergeAllowed)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r785387151 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -145,19 +155,30 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( } /** - * Returns true if push-based shuffle is disabled for this stage or empty RDD, - * or if the shuffle merge for this stage is finalized, i.e. the shuffle merge - * results for all partitions are available. + * Returns true if the RDD is an empty RDD or if the shuffle merge for this shuffle is + * finalized. */ - def shuffleMergeFinalized: Boolean = { + def isShuffleMergeFinalized: Boolean = { // Empty RDD won't be computed therefore shuffle merge finalized should be true by default. -if (shuffleMergeEnabled && numPartitions > 0) { +if (numPartitions > 0) { Review comment: Since `canShuffleMergeBeEnabled` already checks for `numPartition > 0`, do we need this check here ? ## File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala ## @@ -910,4 +910,32 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { rpcEnv.shutdown() slaveRpcEnv.shutdown() } + + test("SPARK-34826: Adaptive shuffle mergers") { +val newConf = new SparkConf +newConf.set("spark.shuffle.push.based.enabled", "true") +newConf.set("spark.shuffle.service.enabled", "true") + +// needs TorrentBroadcast so need a SparkContext +withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc => + val masterTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val rpcEnv = sc.env.rpcEnv + val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf) + rpcEnv.stop(masterTracker.trackerEndpoint) + rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) + + val slaveTracker = new MapOutputTrackerWorker(newConf) + slaveTracker.trackerEndpoint = +rpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) + + masterTracker.registerShuffle(20, 100, 100) + slaveTracker.updateEpoch(masterTracker.getEpoch) + val mergerLocs = (1 to 10).map(x => BlockManagerId(s"exec-$x", s"host-$x", 7337)) + masterTracker.registerShufflePushMergerLocations(20, mergerLocs) + + assert(slaveTracker.getShufflePushMergerLocations(20).size == 10) + slaveTracker.unregisterShuffle(20) + assert(slaveTracker.shufflePushMergerLocations.isEmpty) Review comment: Note: We would need to enhance this test with the additional case mentioned above for `shufflePushMergerLocations`. ## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ## @@ -4147,7 +4146,128 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults) } -/** + test("SPARK-34826: Adaptively fetch shuffle mergers") { +initPushBasedShuffleConfs(conf) +conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6) +DAGSchedulerSuite.clearMergerLocs +DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) +val parts = 7 + +val shuffleMapRdd = new MyRDD(sc, parts, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) +val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + +// Submit a reduce job that depends which will create a map stage +submit(reduceRdd, (0 until parts).toArray) + +runEvent(makeCompletionEvent( + taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts), + Seq.empty, Array.empty, createFakeTaskInfoWithId(0))) + +val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] +assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty) +assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty) + +DAGSchedulerSuite.addMergerLocs(Seq("host6", "host7", "host8")) + +runEvent(makeCompletionEvent( + taskSets(0).tasks(1), Success, makeMapStatus("hostA", parts), + Seq.empty, Array.empty, createFakeTaskInfoWithId(1))) + +// Dummy executor added event to trigger registering of shuffle merger locations +// as shuffle mergers are tracked separately for test +runEvent(ExecutorAdded("dummy", "dummy")) Review comment: Let us make the host valid (say `host6` for example: from the new set of hosts) - in case this code evolves in future. ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -1281,6 +1331,22 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } } + override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = { +
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r780010060 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( _shuffleMergedFinalized = true } + def shuffleMergeFinalized: Boolean = { Review comment: Let us split the state out into two variables - since they are fundamentally two things represented earlier by one boolean. * `shuffleMergeAllowed` * `shufflerMergeEnabled` If `shuffleMergeAllowed` is `false`, then all push based shuffle paths are ignored - and is statically set at driver. It defaults to `canShuffleMergeBeEnabled` - but can be explicitly disabled : for example, when missing tasks are being computed for a determinate shuffle after it has been finalized. This would be set at driver before the stage starts - and can be depended upon at executors to enable push based shuffle paths. `shufflerMergeEnabled` can be set to `true` only if `shuffleMergeAllowed` is `true` and additional conditions are met - for example, sufficient mergers are available. Earlier, this was determined statically when stage started - with this PR, this can become `true` adaptively after stage starts. `shuffleMergedFinalized` can remain as it was defined earlier. Thoughts ? +CC @Ngone51, @otterc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r780010060 ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( _shuffleMergedFinalized = true } + def shuffleMergeFinalized: Boolean = { Review comment: Let us split the state out into two variables - since they are fundamentally two things represented earlier by one boolean. * `shuffleMergeAllowed` * `shufflerMergeEnabled` If `shuffleMergeAllowed` is `false`, then all push based shuffle paths are ignored - and is statically set at driver. It defaults to `canShuffleMergeBeEnabled` - but can be explicitly disabled : for example, when missing tasks are being computed for a determinate shuffle after it has been finalized. This would be set at driver before the stage starts - and can be dependent upon at executors to enable push based shuffle paths. `shufflerMergeEnabled` can be set to `true` only if `shuffleMergeAllowed` is `true` and additional conditions are met - for example, sufficient mergers are available. Earlier, this was determined statically when stage started - with this PR, this can become `true` adaptively after stage starts. `shuffleMergedFinalized` can remain as it was defined earlier. Thoughts ? +CC @Ngone51, @otterc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r779886670 ## File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala ## @@ -39,7 +39,9 @@ class StageInfo( val taskMetrics: TaskMetrics = null, private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty, private[spark] val shuffleDepId: Option[Int] = None, -val resourceProfileId: Int) { +val resourceProfileId: Int, +private[spark] var isPushBasedShuffleEnabled: Boolean = false, +private[spark] var shuffleMergerCount: Int = 0) { Review comment: But they are `private[spark]` with no consumers within spark right now, no ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r779886030 ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler( executorFailureEpoch -= execId } shuffleFileLostEpoch -= execId + +if (pushBasedShuffleEnabled) { Review comment: Ideally, we want to go into this `if` block only if a new host is added (since if executor is on existing/known hosts, number of mergers is not increasing) - more specifically, a new block manager in a new host is added (in `BlockManagerMasterEndpoint`). That is why I was thinking about doing it there ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle
mridulm commented on a change in pull request #34122: URL: https://github.com/apache/spark/pull/34122#discussion_r779287873 ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -1281,6 +1333,22 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } } + override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = { + shufflePushMergerLocations.get(shuffleId).getOrElse(getMergerLocations(shuffleId)) + } + + private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = { +fetchingLock.withLock(shuffleId) { Review comment: Any issues with reusing `fetchingLock` here ? +CC @Ngone51 in case there are concerns. ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -1281,6 +1333,22 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } } + override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = { + shufflePushMergerLocations.get(shuffleId).getOrElse(getMergerLocations(shuffleId)) Review comment: `shufflePushMergerLocations.getOrElse` here ? ## File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala ## @@ -39,7 +39,9 @@ class StageInfo( val taskMetrics: TaskMetrics = null, private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty, private[spark] val shuffleDepId: Option[Int] = None, -val resourceProfileId: Int) { +val resourceProfileId: Int, +private[spark] var isPushBasedShuffleEnabled: Boolean = false, +private[spark] var shuffleMergerCount: Int = 0) { Review comment: Why do we need these ? ## File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ## @@ -59,13 +60,26 @@ private[spark] class ShuffleWriteProcessor extends Serializable with Logging { rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) val mapStatus = writer.stop(success = true) if (mapStatus.isDefined) { +val isPushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(SparkEnv.get.conf, + isDriver = SparkContext.DRIVER_IDENTIFIER == SparkEnv.get.executorId) +// Check if sufficient shuffle mergers are available now for the ShuffleMapTask to push +if (isPushBasedShuffleEnabled && dep.getMergerLocs.isEmpty && !dep.shuffleMergeFinalized) { Review comment: Review note: Here we are depending on the fact that retry of determinate stages will be merge finalized - while indeterminate stages wont be - and so the `!dep.shuffleMergeFinalized` takes care of not trying to fetch mergers/enable push based shuffle for retry of determinate stage. ## File path: core/src/main/scala/org/apache/spark/Dependency.scala ## @@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( _shuffleMergedFinalized = true } + def shuffleMergeFinalized: Boolean = { +_shuffleMergedFinalized + } + /** * Returns true if push-based shuffle is disabled for this stage or empty RDD, * or if the shuffle merge for this stage is finalized, i.e. the shuffle merge * results for all partitions are available. */ - def shuffleMergeFinalized: Boolean = { + def isShuffleMergeOutputsAvailable: Boolean = { Review comment: The method name and semantics of it are a bit confusing (even though we document the expectation). We are returning `true` here when push based shuffle is disabled. The expectation would be to respond with `false` for this method name. How about rename the new `shuffleMergeFinalized` to `isShufleMergeFinalizeEnabled` and make this method `isShuffleMergeFinalizeCompleted` ? (Or something similar and descriptive) ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -1371,19 +1371,33 @@ private[spark] class DAGScheduler( private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { assert(stage.shuffleDep.shuffleMergeEnabled && !stage.shuffleDep.shuffleMergeFinalized) if (stage.shuffleDep.getMergerLocs.isEmpty) { + getAndSetShufflePushMergerLocations(stage) +} + +if (stage.shuffleDep.shuffleMergeEnabled) { + logInfo(("Shuffle merge enabled before starting the stage for %s (%s) with %d" + +" merger locations").format(stage, stage.name, stage.shuffleDep.getMergerLocs.size)) +} else { + logInfo(("Shuffle merge disabled for %s (%s), but can get enabled later" + +" adaptively once enough mergers are available").format(stage, stage.name)) Review comment: Review note: This method is invoked only for: * Initial stage attempt for a shuffle. * All attempts for an indeterminate stage (since prev outputs are discarded) * Only first attempt