Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21607#discussion_r197508752
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
    @@ -153,25 +163,26 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
     
       // Marked private[scheduler] instead of private so this can be mocked in 
tests
       private[scheduler] def handleAskPermissionToCommit(
    -      stage: StageId,
    -      partition: PartitionId,
    -      attemptNumber: TaskAttemptNumber): Boolean = synchronized {
    +      stage: Int,
    +      stageAttempt: Int,
    +      partition: Int,
    +      attemptNumber: Int): Boolean = synchronized {
         authorizedCommittersByStage.get(stage) match {
           case Some(authorizedCommitters) =>
    -        authorizedCommitters(partition) match {
    -          case NO_AUTHORIZED_COMMITTER =>
    -            logDebug(s"Authorizing attemptNumber=$attemptNumber to commit 
for stage=$stage, " +
    -              s"partition=$partition")
    -            authorizedCommitters(partition) = attemptNumber
    -            true
    -          case existingCommitter =>
    -            logDebug(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
    -              s"partition=$partition; existingCommitter = 
$existingCommitter")
    -            false
    +        val existing = authorizedCommitters(partition)
    +        if (existing == null) {
    +          logDebug(s"Authorizing attemptNumber=$attemptNumber to commit 
for stage=$stage, " +
    --- End diff --
    
    $stage.$stageAttempt



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to