Victsm commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r647709610
##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1271,21 +1302,21 @@ private[spark] class DAGScheduler(
* locations for block push/merge by getting the historical locations of
past executors.
*/
private def prepareShuffleServicesForShuffleMapStage(stage:
ShuffleMapStage): Unit = {
- // TODO(SPARK-32920) Handle stage reuse/retry cases separately as without
finalize
- // TODO changes we cannot disable shuffle merge for the retry/reuse cases
- val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
- stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
-
- if (mergerLocs.nonEmpty) {
- stage.shuffleDep.setMergerLocs(mergerLocs)
- logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
- s" ${stage.shuffleDep.getMergerLocs.size} merger locations")
-
- logDebug("List of shuffle push merger locations " +
- s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
- } else {
- logInfo("No available merger locations." +
- s" Push-based shuffle disabled for $stage (${stage.name})")
+ assert(stage.shuffleDep.shuffleMergeEnabled &&
!stage.shuffleDep.shuffleMergeFinalized)
Review comment:
Is it safe to put the second check into this assert?
It could be for submitting a retry of an already merge finalized map stage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]