[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-02-04 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r799920430



##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -3671,7 +3671,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 
 completeShuffleMapStageSuccessfully(0, 0, parts)
 val shuffleStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
-assert(!shuffleStage.shuffleDep.shuffleMergeEnabled)
+assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)

Review comment:
   You are right, my bad.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-02-02 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r797940716



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1821,7 +1834,7 @@ private[spark] class DAGScheduler(
 }
 
 if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
-  if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
+  if (!shuffleStage.shuffleDep.isShuffleMergeFinalizedMarked &&
 shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
 checkAndScheduleShuffleMergeFinalize(shuffleStage)
   } else {

Review comment:
   Sounds good, we can add it to `handleShuffleMergeFinalized` as well: 
that does look like a good place to introduce it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-02-02 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r797940406



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2313,7 +2326,7 @@ private[spark] class DAGScheduler(
 // Register merge statuses if the stage is still running and shuffle merge 
is not finalized yet.
 // TODO: SPARK-35549: Currently merge statuses results which come after 
shuffle merge
 // TODO: is finalized is not registered.
-if (runningStages.contains(stage) && 
!stage.shuffleDep.shuffleMergeFinalized) {
+if (runningStages.contains(stage) && 
!stage.shuffleDep.isShuffleMergeFinalizedMarked) {

Review comment:
   Agree with your analysis on both @venkata91 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-02-02 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r797717323



##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -3671,7 +3671,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 
 completeShuffleMapStageSuccessfully(0, 0, parts)
 val shuffleStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
-assert(!shuffleStage.shuffleDep.shuffleMergeEnabled)
+assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)

Review comment:
   Looks like this comment was missed ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-02-02 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796929787



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1821,7 +1834,7 @@ private[spark] class DAGScheduler(
 }
 
 if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
-  if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
+  if (!shuffleStage.shuffleDep.isShuffleMergeFinalizedMarked &&
 shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
 checkAndScheduleShuffleMergeFinalize(shuffleStage)
   } else {

Review comment:
   In `processShuffleMapStageCompletion`, add something like 
   ```
   if (!shuffleStage.isIndeterminate && 
shuffleStage.shuffleDep.shuffleMergeEnabled) {
 shuffleStage.shuffleDep.setShuffleMergeAllowed(false)
   }
   ``` 
   to ensure we dont retry merge for determinate stages ?
   
   This is strictly not related to this PR, so I am fine with doing it in a 
follow up PR as well to keep the scope contained (we can file a jira in that 
case).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-02-02 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796929787



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1821,7 +1834,7 @@ private[spark] class DAGScheduler(
 }
 
 if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
-  if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
+  if (!shuffleStage.shuffleDep.isShuffleMergeFinalizedMarked &&
 shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
 checkAndScheduleShuffleMergeFinalize(shuffleStage)
   } else {

Review comment:
   In `processShuffleMapStageCompletion`, add something like 
   ```
   if (!shuffleStage.isIndeterminate && 
shuffleStage.shuffleDep.shuffleMergeEnabled) {
 shuffleStage.shuffleDep.setShuffleMergeAllowed(false)
   }
   ``` 
   to ensure we dont retry merge for determinate stages ?
   
   This is strictly not related to this PR, so I am fine with doing it in a 
follow up PR as well to keep the scope contained (we can a jira in that case).

##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -3671,7 +3671,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 
 completeShuffleMapStageSuccessfully(0, 0, parts)
 val shuffleStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
-assert(!shuffleStage.shuffleDep.shuffleMergeEnabled)
+assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)

Review comment:
   Looks like this was missed ?

##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -4147,7 +4146,206 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-/**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+initPushBasedShuffleConfs(conf)
+conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+DAGSchedulerSuite.clearMergerLocs()
+DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+val parts = 2
+
+val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+// Submit a reduce job that depends which will create a map stage
+submit(reduceRdd, (0 until parts).toArray)
+
+runEvent(makeCompletionEvent(
+  taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+  Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+val shuffleStage1 = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+assert(!shuffleStage1.shuffleDep.shuffleMergeEnabled)
+assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+
+// host2 executor added event to trigger registering of shuffle merger 
locations
+// as shuffle mergers are tracked separately for test
+runEvent(ExecutorAdded("exec2", "host2"))
+
+// Check if new shuffle merger locations are available for push or not
+assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+
+// Complete remaining tasks in ShuffleMapStage 0
+runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success,
+  makeMapStatus("host1", parts), Seq.empty, Array.empty, 
createFakeTaskInfoWithId(1)))
+
+completeNextResultStageWithSuccess(1, 0)
+assert(results === Map(0 -> 42, 1 -> 42))
+
+results.clear()
+assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") {
+initPushBasedShuffleConfs(conf)
+conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+DAGSchedulerSuite.clearMergerLocs()
+DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+val parts = 2
+
+val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new 
HashPartitioner(parts))
+val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new 
HashPartitioner(parts))
+val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+  tracker = mapOutputTracker)

Review comment:
   Let us keep them separate, and do it with the other jira if we decide to 
take that path.

##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2313,7 +2326,7 @@ private[spark] class DAGScheduler(
 // Register merge statuses if the stage is still running and shuffle merge 
is not finalized yet.
 // TODO: SPARK-35549: Currently merge statuses results which come 

[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-31 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796060753



##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -3613,8 +3613,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 val shuffleStage2 = 
scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
 assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty)
 
-assert(shuffleStage2.shuffleDep.shuffleMergeFinalized)
-assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+assert(shuffleStage2.shuffleDep.isShuffleMergeFinalizedMarked)
+assert(shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
   I am slightly on the fence for some of these test reverts 
(`shuffleMergeFinalized` -> `isShuffleMergeFinalizedMarked`), since we are 
doing a stronger check now.
   So I am fine with leaving them as they are in this PR as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-31 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796060753



##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -3613,8 +3613,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 val shuffleStage2 = 
scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
 assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty)
 
-assert(shuffleStage2.shuffleDep.shuffleMergeFinalized)
-assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+assert(shuffleStage2.shuffleDep.isShuffleMergeFinalizedMarked)
+assert(shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
   I am slightly on the fence for some of these test reverts 
(`shuffleMergeFinalized` -> `isShuffleMergeFinalizedMarked`), since we are 
doing a stronger check now.
   So I am fine with leaving them as-is as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-31 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796060753



##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -3613,8 +3613,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 val shuffleStage2 = 
scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
 assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty)
 
-assert(shuffleStage2.shuffleDep.shuffleMergeFinalized)
-assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+assert(shuffleStage2.shuffleDep.isShuffleMergeFinalizedMarked)
+assert(shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
   I am slightly on the fence for some of these test reverts, since we are 
doing a stronger check now.
   So I am fine with leaving them as-is as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-31 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796058549



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
##
@@ -131,7 +131,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) 
extends ShuffleManager
   metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
 val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
 val (blocksByAddress, canEnableBatchFetch) =
-  if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
+  if (baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked) {

Review comment:
   Makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-28 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794998416



##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -3686,14 +3686,13 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 completeNextStageWithFetchFailure(3, 0, shuffleDep)
 scheduler.resubmitFailedStages()
 
-// Make sure shuffle merge is disabled for the retry
 val stage2 = scheduler.stageIdToStage(2).asInstanceOf[ShuffleMapStage]
-assert(!stage2.shuffleDep.shuffleMergeEnabled)
+assert(stage2.shuffleDep.shuffleMergeEnabled)

Review comment:
   I was thinking more about this ... should we always mark DETERMINATE 
stages with `setShuffleMergeAllowed(false)` when they complete ?
   It makes things more cleaner, and in line with expectation whether mergers 
were used for first execution or not.
   
   I dont feel very strongly about it though ...
   Thoughts ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-28 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794997274



##
File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
##
@@ -855,7 +855,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
 rpcEnv.shutdown()
   }
 
-  test("SPARK-37023: Avoid fetching merge status when shuffleMergeEnabled is 
false") {
+  test("SPARK-37023: Avoid fetching merge status when 
isShuffleMergeFinalizedMarked is false") {

Review comment:
   nit: isShuffleMergeFinalizedMarked -> useMergeResult ?
   That is what we are testing here actually (this test went through some 
changes as code evolved, but looks like test name was missed).

##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -4051,8 +4050,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 runEvent(StageCancelled(0, Option("Explicit cancel check")))
 scheduler.handleShuffleMergeFinalized(shuffleStage1, 
shuffleStage1.shuffleDep.shuffleMergeId)
 
-assert(shuffleStage1.shuffleDep.shuffleMergeEnabled)
-assert(!shuffleStage1.shuffleDep.shuffleMergeFinalized)
+assert(shuffleStage1.shuffleDep.mergerLocs.nonEmpty)
+assert(!shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
   nit: revert ?

##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -4147,7 +4146,206 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-/**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+initPushBasedShuffleConfs(conf)
+conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+DAGSchedulerSuite.clearMergerLocs()
+DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+val parts = 2
+
+val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+// Submit a reduce job that depends which will create a map stage
+submit(reduceRdd, (0 until parts).toArray)
+
+runEvent(makeCompletionEvent(
+  taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+  Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+val shuffleStage1 = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+assert(!shuffleStage1.shuffleDep.shuffleMergeEnabled)
+assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+
+// host2 executor added event to trigger registering of shuffle merger 
locations
+// as shuffle mergers are tracked separately for test
+runEvent(ExecutorAdded("exec2", "host2"))
+
+// Check if new shuffle merger locations are available for push or not
+assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+
+// Complete remaining tasks in ShuffleMapStage 0
+runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success,
+  makeMapStatus("host1", parts), Seq.empty, Array.empty, 
createFakeTaskInfoWithId(1)))
+
+completeNextResultStageWithSuccess(1, 0)
+assert(results === Map(0 -> 42, 1 -> 42))
+
+results.clear()
+assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") {
+initPushBasedShuffleConfs(conf)
+conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+DAGSchedulerSuite.clearMergerLocs()
+DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+val parts = 2
+
+val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new 
HashPartitioner(parts))
+val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new 
HashPartitioner(parts))
+val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+  tracker = mapOutputTracker)

Review comment:
   Based on discussion 
[above](https://github.com/apache/spark/pull/34122/files#r790376895), we might 
need to end up merging this test with the next ("SPARK-34826: Adaptively fetch 
shuffle mergers with stage retry for indeterminate stage").

##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -1281,6 +1341,33 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
 }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): 
Seq[BlockManagerId] = {
+shufflePushMergerLocations.getOrElse(shuffleId, 
getMergerLocations(shuffleId))
+  }
+
+  private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = 

[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-28 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794718066



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -145,20 +148,26 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   }
 
   /**
-   * Returns true if push-based shuffle is disabled for this stage or empty 
RDD,
-   * or if the shuffle merge for this stage is finalized, i.e. the shuffle 
merge
-   * results for all partitions are available.
+   * Returns true if the RDD is an empty RDD or if the shuffle merge for this 
shuffle is
+   * finalized.
*/
   def shuffleMergeFinalized: Boolean = {
-// Empty RDD won't be computed therefore shuffle merge finalized should be 
true by default.
-if (shuffleMergeEnabled && numPartitions > 0) {
-  _shuffleMergedFinalized
+_shuffleMergedFinalized
+  }

Review comment:
   Thinking more, we are changing semantics of `shuffleMergeFinalized` 
here, which was publically exposed.
   What used to be `shuffleMergeFinalized` is now becoming 
`isShuffleMergeFinalizedIfEnabled` - so the behavior is exposed by another 
method.
   
   We should preserve the earlier semantics for the method ... 
   For this PR, that would mean:
   
   1) Rename `shuffleMergeFinalized` as `isShuffleMergeFinalized` (a 
private[spark] method).
   
   2) Rename `isShuffleMergeFinalizedIfEnabled` to `shuffleMergeFinalized` - 
which will preserve the earlier behavior as well.
   
   Thoughts ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-28 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794700609



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -160,14 +160,14 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
* this shuffle is finalized.
*/
   def isShuffleMergeFinalizedIfEnabled: Boolean = {
-if (mergerLocs.nonEmpty) {
+if (shuffleMergeEnabled) {
   shuffleMergeFinalized
 } else {
   true
 }
   }
 
-  def newShuffleMergeState(): Unit = {
+  private[spark] def newShuffleMergeState(): Unit = {

Review comment:
   This was in 3.2, let us move this change to a separate jira




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-28 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794698873



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if 
shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
-_shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
   @venkata91 Let us fix (methods/variables which should be 
`private[spark]` but ended up getting exposed) the changes from master, which 
are not in 3.2/older - and open a jira for the others as a follow up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-28 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794698873



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if 
shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
-_shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
   @venkata91 Let us fix the changes from master, which are not in 
3.2/older - and open a jira for the others as a follow up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-27 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794072767



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if 
shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
-_shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
   +CC @tgravescs, @Ngone51 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-27 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794072564



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if 
shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
-_shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
   Good call on `incPushCompleted`.
   Both `setMergerLocs` and `newShuffleMergeState` are in released versions, 
which makes it more difficult to make the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-27 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793836705



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2608,8 +2608,12 @@ private[spark] class DAGScheduler(
 // isPushBasedShuffleEnabled and shuffleMergers need to be updated at the 
end.
 stage match {
   case s: ShuffleMapStage =>
-
stage.latestInfo.setPushBasedShuffleEnabled(s.shuffleDep.getMergerLocs.nonEmpty)
-stage.latestInfo.setShuffleMergerCount(s.shuffleDep.getMergerLocs.size)
+val isPushShuffleEnabled =
+  s.shuffleDep.shuffleMergeAllowed && 
s.shuffleDep.getMergerLocs.nonEmpty

Review comment:
   When will we have the case where mergerLocs is non empty, but merge is 
not allowed ?
   Is this for retry of DETERMINATE stage ? If yes, do we want to change 
definition of `def shuffleMergeEnabled` to be
   `shuffleMergeAllowed && mergerLocs.nonEmpty` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-27 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793837336



##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -3686,14 +3686,13 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 completeNextStageWithFetchFailure(3, 0, shuffleDep)
 scheduler.resubmitFailedStages()
 
-// Make sure shuffle merge is disabled for the retry
 val stage2 = scheduler.stageIdToStage(2).asInstanceOf[ShuffleMapStage]
-assert(!stage2.shuffleDep.shuffleMergeEnabled)
+assert(stage2.shuffleDep.shuffleMergeEnabled)

Review comment:
   Thanks for clarifying.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-27 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793836705



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2608,8 +2608,12 @@ private[spark] class DAGScheduler(
 // isPushBasedShuffleEnabled and shuffleMergers need to be updated at the 
end.
 stage match {
   case s: ShuffleMapStage =>
-
stage.latestInfo.setPushBasedShuffleEnabled(s.shuffleDep.getMergerLocs.nonEmpty)
-stage.latestInfo.setShuffleMergerCount(s.shuffleDep.getMergerLocs.size)
+val isPushShuffleEnabled =
+  s.shuffleDep.shuffleMergeAllowed && 
s.shuffleDep.getMergerLocs.nonEmpty

Review comment:
   When will we have the case where mergerLocs is non empty, but merge is 
not allowed ?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-27 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793834972



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -202,7 +204,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   // Only used by DAGScheduler to coordinate shuffle merge finalization
   @transient private[this] var finalizeTask: Option[ScheduledFuture[_]] = None
 
-  def getFinalizeTask: Option[ScheduledFuture[_]] = finalizeTask
+  private[spark] def getFinalizeTask: Option[ScheduledFuture[_]] = finalizeTask
 
   def setFinalizeTask(task: ScheduledFuture[_]): Unit = {

Review comment:
   Make the setter `private[spark]` as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-27 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793834393



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if 
shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
-_shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
   > In that case, should we also make shuffleMergeAllowed as well 
private[spark]. For now it is better to make everything private[spark] related 
to push-based shuffle right? Thoughts?
   
   `shuffleMergeAllowed` is similar to `shuffleMergeEnabled` - and has value as 
a developer api, which developers can query for - the setters dont.
   FinalizeTask on other hand is a purely internal impl detail.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-27 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793833288



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if 
shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
-_shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
   > We are using mergerLocs.isEmpty in other places as well. If we use 
shuffleMergeEnabled (mergerLocs.nonEmpty) and mergerLocs.isEmpty|nonEmpty in 
few other places, wouldn't it be confusing?
   
   Yes, we should not blindly replace - only when the intent is to check if 
shuffle merge is enabled; though I would expect a empty/nonEmpty check against 
mergerLocs to pretty much be a proxy for is shuffle merge enabled.
   For example, the changes in the tests - the earlier intent was to check if 
merge was enabled; which got replaced with checks against mergerLocs
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-26 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792766502



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if 
shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
-_shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
   Let us keep this method - and make it
   `def shuffleMergeEnabled = mergerLocs.nonEmpty`
   What I meant was, let us remove the boolean `_shuffleMergeEnabled`
   
   So that rest of the code can depend on this as a state and not need to 
inspect the mergerLocs directly
   
   
   Also, this is part of public api - so cant remove it :-)
   Speaking of which, can you make `getFinalizeTask`/`getFinalizeTask` 
`private[spark]` as well ? (`ShuffleDependency` is developer api and we cant 
leak impl details)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-26 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792766502



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if 
shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
-_shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
   Let us keep this method - and make it
   `def shuffleMergeEnabled = mergerLocs.nonEmpty`
   What I meant was, let us remove the boolean `_shuffleMergeEnabled`
   
   So that rest of the code can depend on this as a state and not need to 
inspect the mergerLocs directly
   
   
   Also, this is part of public api - so cant remove it :-)
   Speaking of which, can you make `getFinalizeTask`/`getFinalizeTask` 
`private[spark]` as well ? (this or different pr - `ShuffleDependency` is 
developer api and we cant leak impl details)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-26 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792766502



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if 
shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
-_shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
   Let us keep this method - and make it
   `def shuffleMergeEnabled = mergerLocs.nonEmpty`
   What I meant was, let us remove the boolean `_shuffleMergeEnabled`
   
   So that rest of the code can depend on this as a state and not need to 
inspect the mergerLocs directly
   
   
   Also, this is part of public api - so cant remove it :-)
   Speaking of which, can you make `getFinalizeTask` `private[spark]` as well ? 
(this or different pr - `ShuffleDependency` is developer api and we cant leak 
impl details)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-26 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792766502



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if 
shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
-_shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
   Let us keep this method - and make it
   `def shuffleMergeEnabled = mergerLocs.nonEmpty`
   What I meant was, let us remove the boolean `_shuffleMergeEnabled`
   
   So that rest of the code can depend on this as a state and not need to 
inspect the mergerLocs directly
   
   
   Also, this is part of public api - so cant remove it :-)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-26 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792766502



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if 
shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
-_shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
   Let us keep this method - and make it
   `def shuffleMergeEnabled = mergerLocs.nonEmpty`
   What I meant was, let us remove the boolean `_shuffleMergeEnabled`
   
   So that rest of the code can depend on this as a state and not need to 
inspect the mergerLocs directly

##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -3579,7 +3579,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 submit(reduceRdd, (0 until parts).toArray)
 completeShuffleMapStageSuccessfully(0, 0, parts)

Review comment:
   Once we reintroduce the `shuffleMergeEnabled` method, all changes to 
this file (DAGSchedulerSuite) related to mergerLocs.isEmpty == 
!shuffleMergeEnabled (and reverse) can be reverted (for the latest 
[diff](https://github.com/apache/spark/pull/34122/commits/3b52bed7ea4bbdb127a4711201d8c8268ab06650#))
   

##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -4172,7 +4172,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 
 // host2 executor added event to trigger registering of shuffle merger 
locations
 // as shuffle mergers are tracked separately for test
-runEvent(ExecutorAdded("host2", "host2"))
+runEvent(ExecutorAdded("host2", "exec2"))

Review comment:
   Order is reverse - `ExecutorAdded(execId: String, host: String)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-26 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792761973



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -135,6 +144,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   def shuffleMergeId: Int = _shuffleMergeId
 
   def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
+assert(shuffleMergeEnabled || shuffleMergeAllowed)

Review comment:
   Yes, when we want it to be disabled (like with retry of DETERMINATE 
stage which was already finalized), we must set merge allowed to false. It is 
cleaner.
   Note - `def isShuffleMergeEnabled` or `def shuffleMergeEnabled` pointing to 
`mergerLocs.nonEmpty` is fine (and preferred to using mergerLocs.) - we 
just dont need the state boolean.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-26 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792761973



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -135,6 +144,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   def shuffleMergeId: Int = _shuffleMergeId
 
   def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
+assert(shuffleMergeEnabled || shuffleMergeAllowed)

Review comment:
   Yes, when we want it to be disabled (like with retry of DETERMINATE 
stage which was already finalized), we must set merge allowed to false. It is 
cleaner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-23 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r790361031



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -135,6 +144,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   def shuffleMergeId: Int = _shuffleMergeId
 
   def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
+assert(shuffleMergeEnabled || shuffleMergeAllowed)

Review comment:
   `shuffleMergeAllowed` is a superset - we dont need `shuffleMergeEnabled` 
here (and actually, is getting set after `setMergerLocs` in this PR).
   Also, see comment 
[above](https://github.com/apache/spark/pull/34122/files#r785389352) on whether 
we need the variable (`_shuffleMergeEnabled `) at all.

##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -3686,14 +3686,13 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 completeNextStageWithFetchFailure(3, 0, shuffleDep)
 scheduler.resubmitFailedStages()
 
-// Make sure shuffle merge is disabled for the retry
 val stage2 = scheduler.stageIdToStage(2).asInstanceOf[ShuffleMapStage]
-assert(!stage2.shuffleDep.shuffleMergeEnabled)
+assert(stage2.shuffleDep.shuffleMergeEnabled)

Review comment:
   Why did this behavior change ?
   I would expect it to be the same before/after this PR.
   
   For DETERMINATE stage, we should not have merge enabled for the retry if the 
previous attempt did finalize.

##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -4147,7 +4146,210 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-/**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+initPushBasedShuffleConfs(conf)
+conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+DAGSchedulerSuite.clearMergerLocs()
+DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+val parts = 2
+
+val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+// Submit a reduce job that depends which will create a map stage
+submit(reduceRdd, (0 until parts).toArray)
+
+runEvent(makeCompletionEvent(
+  taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+  Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+val shuffleStage1 = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+
+// host2 executor added event to trigger registering of shuffle merger 
locations
+// as shuffle mergers are tracked separately for test
+runEvent(ExecutorAdded("host2", "host2"))

Review comment:
   nit: Use 'exec' instead of 'host' as prefix for executor id

##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -1176,6 +1223,9 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
   // instantiate a serializer. See the followup to SPARK-36705 for more 
details.
   private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, 
isDriver = false)
 
+  // Exposed for testing
+  val shufflePushMergerLocations = new ConcurrentHashMap[Int, 
Seq[BlockManagerId]]().asScala

Review comment:
   Sounds good, this looks fine.

##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1369,24 +1370,34 @@ private[spark] class DAGScheduler(
* locations for block push/merge by getting the historical locations of 
past executors.
*/
   private def prepareShuffleServicesForShuffleMapStage(stage: 
ShuffleMapStage): Unit = {
-assert(stage.shuffleDep.shuffleMergeEnabled && 
!stage.shuffleDep.shuffleMergeFinalized)
+assert(stage.shuffleDep.shuffleMergeAllowed && 
!stage.shuffleDep.shuffleMergeFinalized)
 if (stage.shuffleDep.getMergerLocs.isEmpty) {
-  val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
-stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
-  if (mergerLocs.nonEmpty) {
-stage.shuffleDep.setMergerLocs(mergerLocs)
-logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
-  s" ${stage.shuffleDep.getMergerLocs.size} merger locations")
-
-logDebug("List of shuffle push merger locations " +
-  s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
-  } else {
-stage.shuffleDep.setShuffleMergeEnabled(false)
-

[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-15 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785388090



##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -1176,6 +1223,9 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
   // instantiate a serializer. See the followup to SPARK-36705 for more 
details.
   private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, 
isDriver = false)
 
+  // Exposed for testing
+  val shufflePushMergerLocations = new ConcurrentHashMap[Int, 
Seq[BlockManagerId]]().asScala

Review comment:
   This needs to account for shuffle merge id as well - to ensure the 
merger locations are consistent.
   Possible flow is:
   a) attempt 0 has enough mergers, and so executors cache value for attempt 0 
mergers.
   b) one or more hosts gets added to exclude list before attempt 1).
   c) attempt 1 starts, with merge allowed = true, but merge enabled = false - 
so tasks start without mergers populated in dependency (merge might get enabled 
as more executors are added).
   d) executors which ran attempt 0 will use mergers from previous attempt - 
new executors will use mergers from attempt 1.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-15 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785387856



##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -1281,6 +1331,22 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
 }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): 
Seq[BlockManagerId] = {
+shufflePushMergerLocations.getOrElse(shuffleId, 
getMergerLocations(shuffleId))
+  }
+
+  private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+fetchingLock.withLock(shuffleId) {
+  val mergers = 
askTracker[Seq[BlockManagerId]](GetShufflePushMergerLocations(shuffleId))

Review comment:
   Check if `shufflePushMergerLocations(shuffleId)` is nonEmpty before 
fetching it again - in case of concurrent updates to before we acquire the lock 
for shuffleId
   (See `getStatuses` or other uses of `fetchingLock` for example).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-15 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785389352



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -106,7 +106,17 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   // By default, shuffle merge is enabled for ShuffleDependency if push based 
shuffle
   // is enabled
-  private[this] var _shuffleMergeEnabled = canShuffleMergeBeEnabled()
+  private[this] var _shuffleMergeAllowed = canShuffleMergeBeEnabled()
+
+  private[spark] def setShuffleMergeAllowed(shuffleMergeAllowed: Boolean): 
Unit = {
+_shuffleMergeAllowed = shuffleMergeAllowed
+  }
+
+  def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
+
+  // By default, shuffle merge is enabled for ShuffleDependency if push based 
shuffle
+  // is enabled
+  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
 
   private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
 _shuffleMergeEnabled = shuffleMergeEnabled

Review comment:
   With all the changes we have made, do we need this boolean ? Or will 
`mergerLocs.nonEmpty` not do ? (`def shuffleMergeEnabled : Boolean = 
mergerLocs.nonEmpty`)
   
   Also, in `setShuffleMergeEnabled ` (if we are keeping it) or in 
`setMergerLocs`, add`assert (! shuffleMergeEnabled  || shuffleMergeAllowed)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-15 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785387151



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -145,19 +155,30 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   }
 
   /**
-   * Returns true if push-based shuffle is disabled for this stage or empty 
RDD,
-   * or if the shuffle merge for this stage is finalized, i.e. the shuffle 
merge
-   * results for all partitions are available.
+   * Returns true if the RDD is an empty RDD or if the shuffle merge for this 
shuffle is
+   * finalized.
*/
-  def shuffleMergeFinalized: Boolean = {
+  def isShuffleMergeFinalized: Boolean = {
 // Empty RDD won't be computed therefore shuffle merge finalized should be 
true by default.
-if (shuffleMergeEnabled && numPartitions > 0) {
+if (numPartitions > 0) {

Review comment:
   Since `canShuffleMergeBeEnabled` already checks for `numPartition > 0`, 
do we need this check here ?

##
File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
##
@@ -910,4 +910,32 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
 rpcEnv.shutdown()
 slaveRpcEnv.shutdown()
   }
+
+  test("SPARK-34826: Adaptive shuffle mergers") {
+val newConf = new SparkConf
+newConf.set("spark.shuffle.push.based.enabled", "true")
+newConf.set("spark.shuffle.service.enabled", "true")
+
+// needs TorrentBroadcast so need a SparkContext
+withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { 
sc =>
+  val masterTracker = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+  val rpcEnv = sc.env.rpcEnv
+  val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, 
masterTracker, newConf)
+  rpcEnv.stop(masterTracker.trackerEndpoint)
+  rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
+
+  val slaveTracker = new MapOutputTrackerWorker(newConf)
+  slaveTracker.trackerEndpoint =
+rpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
+
+  masterTracker.registerShuffle(20, 100, 100)
+  slaveTracker.updateEpoch(masterTracker.getEpoch)
+  val mergerLocs = (1 to 10).map(x => BlockManagerId(s"exec-$x", 
s"host-$x", 7337))
+  masterTracker.registerShufflePushMergerLocations(20, mergerLocs)
+
+  assert(slaveTracker.getShufflePushMergerLocations(20).size == 10)
+  slaveTracker.unregisterShuffle(20)
+  assert(slaveTracker.shufflePushMergerLocations.isEmpty)

Review comment:
   Note: We would need to enhance this test with the additional case 
mentioned above for `shufflePushMergerLocations`.

##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -4147,7 +4146,128 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-/**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+initPushBasedShuffleConfs(conf)
+conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+DAGSchedulerSuite.clearMergerLocs
+DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+val parts = 7
+
+val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+// Submit a reduce job that depends which will create a map stage
+submit(reduceRdd, (0 until parts).toArray)
+
+runEvent(makeCompletionEvent(
+  taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+  Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+val shuffleStage1 = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+DAGSchedulerSuite.addMergerLocs(Seq("host6", "host7", "host8"))
+
+runEvent(makeCompletionEvent(
+  taskSets(0).tasks(1), Success, makeMapStatus("hostA", parts),
+  Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+// Dummy executor added event to trigger registering of shuffle merger 
locations
+// as shuffle mergers are tracked separately for test
+runEvent(ExecutorAdded("dummy", "dummy"))

Review comment:
   Let us make the host valid (say `host6` for example: from the new set of 
hosts) - in case this code evolves in future.

##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -1281,6 +1331,22 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
 }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): 
Seq[BlockManagerId] = {
+

[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-06 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r780010060



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {

Review comment:
   Let us split the state out into two variables - since they are 
fundamentally two things represented earlier by one boolean.
   
   * `shuffleMergeAllowed`
   * `shufflerMergeEnabled`
   
   If `shuffleMergeAllowed` is `false`, then all push based shuffle paths are 
ignored - and is statically set at driver.
   It defaults to `canShuffleMergeBeEnabled` - but can be explicitly disabled : 
for example, when missing tasks are being computed for a determinate shuffle 
after it has been finalized.
   This would be set at driver before the stage starts - and can be depended 
upon at executors to enable push based shuffle paths.
   
   `shufflerMergeEnabled` can be set to `true` only if `shuffleMergeAllowed` is 
`true` and additional conditions are met - for example, sufficient mergers are 
available.
   Earlier, this was determined statically when stage started - with this PR, 
this can become `true` adaptively after stage starts.
   `shuffleMergedFinalized` can remain as it was defined earlier.
   
   Thoughts ?
   
   +CC @Ngone51, @otterc 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-06 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r780010060



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {

Review comment:
   Let us split the state out into two variables - since they are 
fundamentally two things represented earlier by one boolean.
   
   * `shuffleMergeAllowed`
   * `shufflerMergeEnabled`
   
   If `shuffleMergeAllowed` is `false`, then all push based shuffle paths are 
ignored - and is statically set at driver.
   It defaults to `canShuffleMergeBeEnabled` - but can be explicitly disabled : 
for example, when missing tasks are being computed for a determinate shuffle 
after it has been finalized.
   This would be set at driver before the stage starts - and can be dependent 
upon at executors to enable push based shuffle paths.
   
   `shufflerMergeEnabled` can be set to `true` only if `shuffleMergeAllowed` is 
`true` and additional conditions are met - for example, sufficient mergers are 
available.
   Earlier, this was determined statically when stage started - with this PR, 
this can become `true` adaptively after stage starts.
   `shuffleMergedFinalized` can remain as it was defined earlier.
   
   Thoughts ?
   
   +CC @Ngone51, @otterc 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-06 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r779886670



##
File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
##
@@ -39,7 +39,9 @@ class StageInfo(
 val taskMetrics: TaskMetrics = null,
 private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = 
Seq.empty,
 private[spark] val shuffleDepId: Option[Int] = None,
-val resourceProfileId: Int) {
+val resourceProfileId: Int,
+private[spark] var isPushBasedShuffleEnabled: Boolean = false,
+private[spark] var shuffleMergerCount: Int = 0) {

Review comment:
   But they are `private[spark]` with no consumers within spark right now, 
no ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-06 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r779886030



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
   executorFailureEpoch -= execId
 }
 shuffleFileLostEpoch -= execId
+
+if (pushBasedShuffleEnabled) {

Review comment:
   Ideally, we want to go into this `if` block only if a new host is added 
(since if executor is on existing/known hosts, number of mergers is not 
increasing) - more specifically, a new block manager in a new host is added (in 
`BlockManagerMasterEndpoint`).
   
   That is why I was thinking about doing it there ...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

2022-01-05 Thread GitBox


mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r779287873



##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -1281,6 +1333,22 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
 }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): 
Seq[BlockManagerId] = {
+
shufflePushMergerLocations.get(shuffleId).getOrElse(getMergerLocations(shuffleId))
+  }
+
+  private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+fetchingLock.withLock(shuffleId) {

Review comment:
   Any issues with reusing `fetchingLock` here ?
   +CC @Ngone51 in case there are concerns.

##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -1281,6 +1333,22 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
 }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): 
Seq[BlockManagerId] = {
+
shufflePushMergerLocations.get(shuffleId).getOrElse(getMergerLocations(shuffleId))

Review comment:
   `shufflePushMergerLocations.getOrElse` here ?

##
File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
##
@@ -39,7 +39,9 @@ class StageInfo(
 val taskMetrics: TaskMetrics = null,
 private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = 
Seq.empty,
 private[spark] val shuffleDepId: Option[Int] = None,
-val resourceProfileId: Int) {
+val resourceProfileId: Int,
+private[spark] var isPushBasedShuffleEnabled: Boolean = false,
+private[spark] var shuffleMergerCount: Int = 0) {

Review comment:
   Why do we need these ?

##
File path: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
##
@@ -59,13 +60,26 @@ private[spark] class ShuffleWriteProcessor extends 
Serializable with Logging {
 rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: 
Product2[Any, Any]]])
   val mapStatus = writer.stop(success = true)
   if (mapStatus.isDefined) {
+val isPushBasedShuffleEnabled = 
Utils.isPushBasedShuffleEnabled(SparkEnv.get.conf,
+  isDriver = SparkContext.DRIVER_IDENTIFIER == SparkEnv.get.executorId)
+// Check if sufficient shuffle mergers are available now for the 
ShuffleMapTask to push
+if (isPushBasedShuffleEnabled && dep.getMergerLocs.isEmpty && 
!dep.shuffleMergeFinalized) {

Review comment:
   Review note: Here we are depending on the fact that retry of determinate 
stages will be merge finalized - while indeterminate stages wont be - and so 
the `!dep.shuffleMergeFinalized` takes care of not trying to fetch 
mergers/enable push based shuffle for retry of determinate stage.

##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {
+_shuffleMergedFinalized
+  }
+
   /**
* Returns true if push-based shuffle is disabled for this stage or empty 
RDD,
* or if the shuffle merge for this stage is finalized, i.e. the shuffle 
merge
* results for all partitions are available.
*/
-  def shuffleMergeFinalized: Boolean = {
+  def isShuffleMergeOutputsAvailable: Boolean = {

Review comment:
   The method name and semantics of it are a bit confusing (even though we 
document the expectation).
   We are returning `true` here when push based shuffle is disabled. The 
expectation would be to respond with `false` for this method name.
   
   How about rename the new `shuffleMergeFinalized` to 
`isShufleMergeFinalizeEnabled` and make this method 
`isShuffleMergeFinalizeCompleted` ?
   (Or something similar and descriptive)

##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1371,19 +1371,33 @@ private[spark] class DAGScheduler(
   private def prepareShuffleServicesForShuffleMapStage(stage: 
ShuffleMapStage): Unit = {
 assert(stage.shuffleDep.shuffleMergeEnabled && 
!stage.shuffleDep.shuffleMergeFinalized)
 if (stage.shuffleDep.getMergerLocs.isEmpty) {
+  getAndSetShufflePushMergerLocations(stage)
+}
+
+if (stage.shuffleDep.shuffleMergeEnabled) {
+  logInfo(("Shuffle merge enabled before starting the stage for %s (%s) 
with %d" +
+" merger locations").format(stage, stage.name, 
stage.shuffleDep.getMergerLocs.size))
+} else {
+  logInfo(("Shuffle merge disabled for %s (%s), but can get enabled later" 
+
+" adaptively once enough mergers are available").format(stage, 
stage.name))

Review comment:
   Review note: This method is invoked only for:
   * Initial stage attempt for a shuffle.
   * All attempts for an indeterminate stage (since prev outputs are discarded)
   * Only first attempt