viirya commented on a change in pull request #23563: [SPARK-26634]Do not allow 
task of FetchFailureStage commit in OutputCommitCoordinator
URL: https://github.com/apache/spark/pull/23563#discussion_r256641730
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
 ##########
 @@ -114,13 +115,16 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
    * yet been initialized.
    *
    * @param stage the stage id.
+   * @param stageAttemptNumber the stage attempt number.
    * @param maxPartitionId the maximum partition id that could appear in this 
stage's tasks (i.e.
    *                       the maximum possible value of 
`context.partitionId`).
    */
-  private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = 
synchronized {
+  private[scheduler] def stageStart(
+    stage: Int, stageAttemptNumber: Int, maxPartitionId: Int): Unit = 
synchronized {
     stageStates.get(stage) match {
       case Some(state) =>
         require(state.authorizedCommitters.length == maxPartitionId + 1)
+        state.latestStageAttempt = stageAttemptNumber
 
 Review comment:
   Don't we need to check if current `latestStageAttempt` is less than 
`stageAttemptNumber` or not?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to