Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21577#discussion_r196174240
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
    @@ -155,47 +164,41 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
     
       // Marked private[scheduler] instead of private so this can be mocked in 
tests
       private[scheduler] def handleAskPermissionToCommit(
    -      stage: StageId,
    -      partition: PartitionId,
    -      attemptNumber: TaskAttemptNumber): Boolean = synchronized {
    +      stage: Int,
    +      stageAttempt: Int,
    +      partition: Int,
    +      attemptNumber: Int): Boolean = synchronized {
         stageStates.get(stage) match {
    -      case Some(state) if attemptFailed(state, partition, attemptNumber) =>
    -        logInfo(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage," +
    -          s" partition=$partition as task attempt $attemptNumber has 
already failed.")
    +      case Some(state) if attemptFailed(state, stageAttempt, partition, 
attemptNumber) =>
    +        logInfo(s"Commit denied for stage=$stage/$attemptNumber, 
partition=$partition: " +
    +          s"task attempt $attemptNumber already marked as failed.")
             false
           case Some(state) =>
    -        state.authorizedCommitters(partition) match {
    -          case NO_AUTHORIZED_COMMITTER =>
    -            logDebug(s"Authorizing attemptNumber=$attemptNumber to commit 
for stage=$stage, " +
    -              s"partition=$partition")
    -            state.authorizedCommitters(partition) = attemptNumber
    -            true
    -          case existingCommitter =>
    -            // Coordinator should be idempotent when receiving 
AskPermissionToCommit.
    -            if (existingCommitter == attemptNumber) {
    -              logWarning(s"Authorizing duplicate request to commit for " +
    -                s"attemptNumber=$attemptNumber to commit for 
stage=$stage," +
    -                s" partition=$partition; existingCommitter = 
$existingCommitter." +
    -                s" This can indicate dropped network traffic.")
    -              true
    -            } else {
    -              logDebug(s"Denying attemptNumber=$attemptNumber to commit 
for stage=$stage, " +
    -                s"partition=$partition; existingCommitter = 
$existingCommitter")
    -              false
    -            }
    +        val existing = state.authorizedCommitters(partition)
    +        if (existing == null) {
    +          logDebug(s"Commit allowed for stage=$stage/$attemptNumber, 
partition=$partition: " +
    +            s"task attempt $attemptNumber")
    +          state.authorizedCommitters(partition) = 
TaskIdentifier(stageAttempt, attemptNumber)
    +          true
    +        } else {
    +          logDebug(s"Commit denied for stage=$stage/$attemptNumber, 
partition=$partition: " +
    --- End diff --
    
    would be nice to include the stage attempt in the log messages as well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to