mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r779287873



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1281,6 +1333,22 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
     }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): 
Seq[BlockManagerId] = {
+    
shufflePushMergerLocations.get(shuffleId).getOrElse(getMergerLocations(shuffleId))
+  }
+
+  private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    fetchingLock.withLock(shuffleId) {

Review comment:
       Any issues with reusing `fetchingLock` here ?
   +CC @Ngone51 in case there are concerns.

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1281,6 +1333,22 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
     }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): 
Seq[BlockManagerId] = {
+    
shufflePushMergerLocations.get(shuffleId).getOrElse(getMergerLocations(shuffleId))

Review comment:
       `shufflePushMergerLocations.getOrElse` here ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
##########
@@ -39,7 +39,9 @@ class StageInfo(
     val taskMetrics: TaskMetrics = null,
     private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = 
Seq.empty,
     private[spark] val shuffleDepId: Option[Int] = None,
-    val resourceProfileId: Int) {
+    val resourceProfileId: Int,
+    private[spark] var isPushBasedShuffleEnabled: Boolean = false,
+    private[spark] var shuffleMergerCount: Int = 0) {

Review comment:
       Why do we need these ?

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
##########
@@ -59,13 +60,26 @@ private[spark] class ShuffleWriteProcessor extends 
Serializable with Logging {
         rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: 
Product2[Any, Any]]])
       val mapStatus = writer.stop(success = true)
       if (mapStatus.isDefined) {
+        val isPushBasedShuffleEnabled = 
Utils.isPushBasedShuffleEnabled(SparkEnv.get.conf,
+          isDriver = SparkContext.DRIVER_IDENTIFIER == SparkEnv.get.executorId)
+        // Check if sufficient shuffle mergers are available now for the 
ShuffleMapTask to push
+        if (isPushBasedShuffleEnabled && dep.getMergerLocs.isEmpty && 
!dep.shuffleMergeFinalized) {

Review comment:
       Review note: Here we are depending on the fact that retry of determinate 
stages will be merge finalized - while indeterminate stages wont be - and so 
the `!dep.shuffleMergeFinalized` takes care of not trying to fetch 
mergers/enable push based shuffle for retry of determinate stage.

##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {
+    _shuffleMergedFinalized
+  }
+
   /**
    * Returns true if push-based shuffle is disabled for this stage or empty 
RDD,
    * or if the shuffle merge for this stage is finalized, i.e. the shuffle 
merge
    * results for all partitions are available.
    */
-  def shuffleMergeFinalized: Boolean = {
+  def isShuffleMergeOutputsAvailable: Boolean = {

Review comment:
       The method name and semantics of it are a bit confusing (even though we 
document the expectation).
   We are returning `true` here when push based shuffle is disabled. The 
expectation would be to respond with `false` for this method name.
   
   How about rename the new `shuffleMergeFinalized` to 
`isShufleMergeFinalizeEnabled` and make this method 
`isShuffleMergeFinalizeCompleted` ?
   (Or something similar and descriptive)

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1371,19 +1371,33 @@ private[spark] class DAGScheduler(
   private def prepareShuffleServicesForShuffleMapStage(stage: 
ShuffleMapStage): Unit = {
     assert(stage.shuffleDep.shuffleMergeEnabled && 
!stage.shuffleDep.shuffleMergeFinalized)
     if (stage.shuffleDep.getMergerLocs.isEmpty) {
+      getAndSetShufflePushMergerLocations(stage)
+    }
+
+    if (stage.shuffleDep.shuffleMergeEnabled) {
+      logInfo(("Shuffle merge enabled before starting the stage for %s (%s) 
with %d" +
+        " merger locations").format(stage, stage.name, 
stage.shuffleDep.getMergerLocs.size))
+    } else {
+      logInfo(("Shuffle merge disabled for %s (%s), but can get enabled later" 
+
+        " adaptively once enough mergers are available").format(stage, 
stage.name))

Review comment:
       Review note: This method is invoked only for:
   * Initial stage attempt for a shuffle.
   * All attempts for an indeterminate stage (since prev outputs are discarded)
   * Only first attempt for a determinate stage, and not for subsequent 
attempts.
   
   Given this, the log message is correct.

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -429,13 +441,17 @@ private[spark] case class GetMapOutputStatuses(shuffleId: 
Int)
   extends MapOutputTrackerMessage
 private[spark] case class GetMapAndMergeResultStatuses(shuffleId: Int)
   extends MapOutputTrackerMessage
+private[spark] case class GetShufflePushMergerLocations(shuffleId: Int)
+  extends MapOutputTrackerMessage
 private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
 
 private[spark] sealed trait MapOutputTrackerMasterMessage
 private[spark] case class GetMapOutputMessage(shuffleId: Int,
   context: RpcCallContext) extends MapOutputTrackerMasterMessage
 private[spark] case class GetMapAndMergeOutputMessage(shuffleId: Int,
   context: RpcCallContext) extends MapOutputTrackerMasterMessage
+private[spark] case class GetShuffleMergersMessage(shuffleId: Int,
+  context: RpcCallContext) extends MapOutputTrackerMasterMessage

Review comment:
       Rename to `GetShuflePushMergersMessage` to name it similar to its 
`MapOutputTrackerMessage` message ?
   

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
       executorFailureEpoch -= execId
     }
     shuffleFileLostEpoch -= execId
+
+    if (pushBasedShuffleEnabled) {

Review comment:
       Do this only if new executor is on a new host ?
   One thought exercise would be to fire an event to DAGScheduler from 
`BlockManagerMasterEndpoint.addMergerLocation` when a new merger location is 
added -  `getAndSetShufflePushMergerLocations` could result in populating new 
mergers only in that case anyway.

##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {

Review comment:
       This redefinition of `shuffleMergeFinalized `, particularly given the 
earlier semantics of `shuffleMergeFinalized`, is a bit confusing - I need to 
relook at how `shuffleMergeFinalized` is used to ensure there are no issues 
with this.

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -457,6 +473,12 @@ private[spark] class MapOutputTrackerMasterEndpoint(
       logInfo(s"Asked to send map/merge result locations for shuffle 
$shuffleId to $hostPort")
       tracker.post(GetMapAndMergeOutputMessage(shuffleId, context))
 
+    case GetShufflePushMergerLocations(shuffleId: Int) =>
+      val hostPort = context.senderAddress.hostPort
+      logInfo(s"Asked to send shuffle push merger locations for shuffle" +
+        s" $shuffleId to $hostPort")

Review comment:
       nit: move `hostPort` into the `logInfo`
   Same below as well.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
       executorFailureEpoch -= execId
     }
     shuffleFileLostEpoch -= execId
+
+    if (pushBasedShuffleEnabled) {
+      // Only set merger locations for stages that are not yet finished and 
have empty mergers
+      shuffleIdToMapStage.filter { case (_, stage) =>

Review comment:
       Instead of iterating over all entries in `shuffleIdToMapStage` - 
materialize subset of stages which satisfy the condition 
`stage.shuffleDep.getMergerLocs.isEmpty` in a `Set` ?
   We know which stages need to be in this set based on whether stage is 
candidate for enabling push based shuffle and which had the initial set of 
mergers empty while submitting the stage.
   We can clean up the `Set` when stage completes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to