[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

2023-03-21 Thread via GitHub


mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1478905577

   Is your jira id `StoveM` @Stove-hust  ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

2023-03-21 Thread via GitHub


mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1478833979

   I could not cherry pick this into 3.4 and 3.3 - we should fix for those 
branches as well IMO.
   Can you create a PR against those two branches as well @Stove-hust ? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

2023-03-21 Thread via GitHub


mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1478833621

   Merged to master.
   Thanks for working on this @Stove-hust !
   Thanks for the review @otterc :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

2023-03-20 Thread via GitHub


mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1476585572

   The test failure is unrelated to this PR - once the changes above are made, 
the reexecution should pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

2023-03-19 Thread via GitHub


mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1475538274

   Technically, 3 :-)
   The UT that I added will generate 2 tests - one for push based shuffle and 
one without.
   And we have the initial test you added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

2023-03-18 Thread via GitHub


mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1474986959

   @Stove-hust To clarify - I meant add this as well.
   We should keep the UT you had added - and it is important to test the 
specific expectation as it stands today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

2023-03-18 Thread via GitHub


mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1474782673

   Instead of testing specifically for the flag - which is subject to change as 
the implementation evolves, we should test for behavior here.
   
   This is the reproducible test I was using (with some changes) to test 
approaches for this bug:
   
   ```
 for (pushBasedShuffleEnabled <- Seq(true, false)) {
   test("SPARK-40082: recomputation of shuffle map stage with no pending 
partitions should not " +
   s"hang. pushBasedShuffleEnabled = $pushBasedShuffleEnabled") {
   
 if (pushBasedShuffleEnabled) {
   initPushBasedShuffleConfs(conf)
   DAGSchedulerSuite.clearMergerLocs()
   DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", 
"host4", "host5"))
 }
   
 var taskIdCount = 0
   
 var completedStage: List[Int] = Nil
 val listener = new SparkListener() {
   override def onStageCompleted(event: SparkListenerStageCompleted): 
Unit = {
 completedStage = completedStage :+ event.stageInfo.stageId
   }
 }
 sc.addSparkListener(listener)
   
 val fetchFailParentPartition = 0
   
 val shuffleMapRdd0 = new MyRDD(sc, 2, Nil)
 val shuffleDep0 = new ShuffleDependency(shuffleMapRdd0, new 
HashPartitioner(2))
   
 val shuffleMapRdd1 = new MyRDD(sc, 2, List(shuffleDep0), tracker = 
mapOutputTracker)
 val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new 
HashPartitioner(2))
   
 val reduceRdd = new MyRDD(sc, 2, List(shuffleDep1), tracker = 
mapOutputTracker)
   
 // submit the initial mapper stage, generate shuffle output for first 
reducer stage.
 submitMapStage(shuffleDep0)
   
 // Map stage completes successfully,
 completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostB"))
 taskIdCount += 2
 assert(completedStage === List(0))
   
 // Now submit the first reducer stage
 submitMapStage(shuffleDep1)
   
 def createTaskInfo(speculative: Boolean): TaskInfo = {
   val taskInfo = new TaskInfo(
 taskId = taskIdCount,
 index = 0,
 attemptNumber = 0,
 partitionId = 0,
 launchTime = 0L,
 executorId = "",
 host = "hostC",
 TaskLocality.ANY,
 speculative = speculative)
   taskIdCount += 1
   taskInfo
 }
   
 val normalTask = createTaskInfo(speculative = false);
 val speculativeTask = createTaskInfo(speculative = true)
   
 // fail task 1.0 due to FetchFailed, and make 1.1 succeed.
 runEvent(makeCompletionEvent(taskSets(1).tasks(0),
   FetchFailed(makeBlockManagerId("hostA"), shuffleDep0.shuffleId, 
normalTask.taskId,
 fetchFailParentPartition, normalTask.index, "ignored"),
   result = null,
   Seq.empty,
   Array.empty,
   normalTask))
   
 // Make the speculative task succeed after initial task has failed
 runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success,
   result = MapStatus(BlockManagerId("hostD-exec1", "hostD", 34512),
 Array.fill[Long](2)(2), mapTaskId = speculativeTask.taskId),
   taskInfo = speculativeTask))
   
 // The second task, for partition 1 succeeds as well.
 runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success,
   result = MapStatus(BlockManagerId("hostE-exec2", "hostE", 23456),
 Array.fill[Long](2)(2), mapTaskId = taskIdCount),
 ))
 taskIdCount += 1
   
 sc.listenerBus.waitUntilEmpty()
 assert(completedStage === List(0, 2))
   
 // the stages will now get resubmitted due to the failure
 Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
   
 // parent map stage resubmitted
 assert(scheduler.runningStages.size === 1)
 val mapStage = scheduler.runningStages.head
   
 // Stage 1 is same as Stage 0 - but created for the ShuffleMapTask 2, 
as it is a
 // different job
 assert(mapStage.id === 1)
 assert(mapStage.latestInfo.failureReason.isEmpty)
 // only the partition reported in fetch failure is resubmitted
 assert(mapStage.latestInfo.numTasks === 1)
   
 val stage0Retry = taskSets.filter(_.stageId == 1)
 assert(stage0Retry.size === 1)
 // make the original task succeed
 
runEvent(makeCompletionEvent(stage0Retry.head.tasks(fetchFailParentPartition), 
Success,
   result = MapStatus(BlockManagerId("hostF-exec1", "hostF", 12345),
 Array.fill[Long](2)(2), mapTaskId = taskIdCount)))
 Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
   
 // The retries should succeed
 sc.listenerBus.waitUntilEmpty()
 assert(completedStage === List(0, 2, 1, 2)

[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

2023-03-16 Thread via GitHub


mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1473171295

   So this is an interesting coincidence, I literally encountered a production 
job which seems to be hitting this exact same issue :-)
   I was in the process of creating a test case, but my intuition was along the 
same lines as this PR.
   
   Can you create a test case to validate this behavior @Stove-hust ?
   Essentially it should fail with current master, and succeed after this 
change.
   
   Thanks for working on this fix


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]