mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r779287873
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1281,6 +1333,22 @@ private[spark] class MapOutputTrackerWorker(conf:
SparkConf) extends MapOutputTr
}
}
+ override def getShufflePushMergerLocations(shuffleId: Int):
Seq[BlockManagerId] = {
+
shufflePushMergerLocations.get(shuffleId).getOrElse(getMergerLocations(shuffleId))
+ }
+
+ private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+ fetchingLock.withLock(shuffleId) {
Review comment:
Any issues with reusing `fetchingLock` here ?
+CC @Ngone51 in case there are concerns.
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1281,6 +1333,22 @@ private[spark] class MapOutputTrackerWorker(conf:
SparkConf) extends MapOutputTr
}
}
+ override def getShufflePushMergerLocations(shuffleId: Int):
Seq[BlockManagerId] = {
+
shufflePushMergerLocations.get(shuffleId).getOrElse(getMergerLocations(shuffleId))
Review comment:
`shufflePushMergerLocations.getOrElse` here ?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
##########
@@ -39,7 +39,9 @@ class StageInfo(
val taskMetrics: TaskMetrics = null,
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] =
Seq.empty,
private[spark] val shuffleDepId: Option[Int] = None,
- val resourceProfileId: Int) {
+ val resourceProfileId: Int,
+ private[spark] var isPushBasedShuffleEnabled: Boolean = false,
+ private[spark] var shuffleMergerCount: Int = 0) {
Review comment:
Why do we need these ?
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
##########
@@ -59,13 +60,26 @@ private[spark] class ShuffleWriteProcessor extends
Serializable with Logging {
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <:
Product2[Any, Any]]])
val mapStatus = writer.stop(success = true)
if (mapStatus.isDefined) {
+ val isPushBasedShuffleEnabled =
Utils.isPushBasedShuffleEnabled(SparkEnv.get.conf,
+ isDriver = SparkContext.DRIVER_IDENTIFIER == SparkEnv.get.executorId)
+ // Check if sufficient shuffle mergers are available now for the
ShuffleMapTask to push
+ if (isPushBasedShuffleEnabled && dep.getMergerLocs.isEmpty &&
!dep.shuffleMergeFinalized) {
Review comment:
Review note: Here we are depending on the fact that retry of determinate
stages will be merge finalized - while indeterminate stages wont be - and so
the `!dep.shuffleMergeFinalized` takes care of not trying to fetch
mergers/enable push based shuffle for retry of determinate stage.
##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
_shuffleMergedFinalized = true
}
+ def shuffleMergeFinalized: Boolean = {
+ _shuffleMergedFinalized
+ }
+
/**
* Returns true if push-based shuffle is disabled for this stage or empty
RDD,
* or if the shuffle merge for this stage is finalized, i.e. the shuffle
merge
* results for all partitions are available.
*/
- def shuffleMergeFinalized: Boolean = {
+ def isShuffleMergeOutputsAvailable: Boolean = {
Review comment:
The method name and semantics of it are a bit confusing (even though we
document the expectation).
We are returning `true` here when push based shuffle is disabled. The
expectation would be to respond with `false` for this method name.
How about rename the new `shuffleMergeFinalized` to
`isShufleMergeFinalizeEnabled` and make this method
`isShuffleMergeFinalizeCompleted` ?
(Or something similar and descriptive)
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1371,19 +1371,33 @@ private[spark] class DAGScheduler(
private def prepareShuffleServicesForShuffleMapStage(stage:
ShuffleMapStage): Unit = {
assert(stage.shuffleDep.shuffleMergeEnabled &&
!stage.shuffleDep.shuffleMergeFinalized)
if (stage.shuffleDep.getMergerLocs.isEmpty) {
+ getAndSetShufflePushMergerLocations(stage)
+ }
+
+ if (stage.shuffleDep.shuffleMergeEnabled) {
+ logInfo(("Shuffle merge enabled before starting the stage for %s (%s)
with %d" +
+ " merger locations").format(stage, stage.name,
stage.shuffleDep.getMergerLocs.size))
+ } else {
+ logInfo(("Shuffle merge disabled for %s (%s), but can get enabled later"
+
+ " adaptively once enough mergers are available").format(stage,
stage.name))
Review comment:
Review note: This method is invoked only for:
* Initial stage attempt for a shuffle.
* All attempts for an indeterminate stage (since prev outputs are discarded)
* Only first attempt for a determinate stage, and not for subsequent
attempts.
Given this, the log message is correct.
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -429,13 +441,17 @@ private[spark] case class GetMapOutputStatuses(shuffleId:
Int)
extends MapOutputTrackerMessage
private[spark] case class GetMapAndMergeResultStatuses(shuffleId: Int)
extends MapOutputTrackerMessage
+private[spark] case class GetShufflePushMergerLocations(shuffleId: Int)
+ extends MapOutputTrackerMessage
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] sealed trait MapOutputTrackerMasterMessage
private[spark] case class GetMapOutputMessage(shuffleId: Int,
context: RpcCallContext) extends MapOutputTrackerMasterMessage
private[spark] case class GetMapAndMergeOutputMessage(shuffleId: Int,
context: RpcCallContext) extends MapOutputTrackerMasterMessage
+private[spark] case class GetShuffleMergersMessage(shuffleId: Int,
+ context: RpcCallContext) extends MapOutputTrackerMasterMessage
Review comment:
Rename to `GetShuflePushMergersMessage` to name it similar to its
`MapOutputTrackerMessage` message ?
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
executorFailureEpoch -= execId
}
shuffleFileLostEpoch -= execId
+
+ if (pushBasedShuffleEnabled) {
Review comment:
Do this only if new executor is on a new host ?
One thought exercise would be to fire an event to DAGScheduler from
`BlockManagerMasterEndpoint.addMergerLocation` when a new merger location is
added - `getAndSetShufflePushMergerLocations` could result in populating new
mergers only in that case anyway.
##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
_shuffleMergedFinalized = true
}
+ def shuffleMergeFinalized: Boolean = {
Review comment:
This redefinition of `shuffleMergeFinalized `, particularly given the
earlier semantics of `shuffleMergeFinalized`, is a bit confusing - I need to
relook at how `shuffleMergeFinalized` is used to ensure there are no issues
with this.
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -457,6 +473,12 @@ private[spark] class MapOutputTrackerMasterEndpoint(
logInfo(s"Asked to send map/merge result locations for shuffle
$shuffleId to $hostPort")
tracker.post(GetMapAndMergeOutputMessage(shuffleId, context))
+ case GetShufflePushMergerLocations(shuffleId: Int) =>
+ val hostPort = context.senderAddress.hostPort
+ logInfo(s"Asked to send shuffle push merger locations for shuffle" +
+ s" $shuffleId to $hostPort")
Review comment:
nit: move `hostPort` into the `logInfo`
Same below as well.
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
executorFailureEpoch -= execId
}
shuffleFileLostEpoch -= execId
+
+ if (pushBasedShuffleEnabled) {
+ // Only set merger locations for stages that are not yet finished and
have empty mergers
+ shuffleIdToMapStage.filter { case (_, stage) =>
Review comment:
Instead of iterating over all entries in `shuffleIdToMapStage` -
materialize subset of stages which satisfy the condition
`stage.shuffleDep.getMergerLocs.isEmpty` in a `Set` ?
We know which stages need to be in this set based on whether stage is
candidate for enabling push based shuffle and which had the initial set of
mergers empty while submitting the stage.
We can clean up the `Set` when stage completes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]