Github user tgravescs commented on a diff in the pull request:
https://github.com/apache/spark/pull/21607#discussion_r197508752
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -153,25 +163,26 @@ private[spark] class OutputCommitCoordinator(conf:
SparkConf, isDriver: Boolean)
// Marked private[scheduler] instead of private so this can be mocked in
tests
private[scheduler] def handleAskPermissionToCommit(
- stage: StageId,
- partition: PartitionId,
- attemptNumber: TaskAttemptNumber): Boolean = synchronized {
+ stage: Int,
+ stageAttempt: Int,
+ partition: Int,
+ attemptNumber: Int): Boolean = synchronized {
authorizedCommittersByStage.get(stage) match {
case Some(authorizedCommitters) =>
- authorizedCommitters(partition) match {
- case NO_AUTHORIZED_COMMITTER =>
- logDebug(s"Authorizing attemptNumber=$attemptNumber to commit
for stage=$stage, " +
- s"partition=$partition")
- authorizedCommitters(partition) = attemptNumber
- true
- case existingCommitter =>
- logDebug(s"Denying attemptNumber=$attemptNumber to commit for
stage=$stage, " +
- s"partition=$partition; existingCommitter =
$existingCommitter")
- false
+ val existing = authorizedCommitters(partition)
+ if (existing == null) {
+ logDebug(s"Authorizing attemptNumber=$attemptNumber to commit
for stage=$stage, " +
--- End diff --
$stage.$stageAttempt
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]