Github user tgravescs commented on a diff in the pull request:
https://github.com/apache/spark/pull/21577#discussion_r196174240
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -155,47 +164,41 @@ private[spark] class OutputCommitCoordinator(conf:
SparkConf, isDriver: Boolean)
// Marked private[scheduler] instead of private so this can be mocked in
tests
private[scheduler] def handleAskPermissionToCommit(
- stage: StageId,
- partition: PartitionId,
- attemptNumber: TaskAttemptNumber): Boolean = synchronized {
+ stage: Int,
+ stageAttempt: Int,
+ partition: Int,
+ attemptNumber: Int): Boolean = synchronized {
stageStates.get(stage) match {
- case Some(state) if attemptFailed(state, partition, attemptNumber) =>
- logInfo(s"Denying attemptNumber=$attemptNumber to commit for
stage=$stage," +
- s" partition=$partition as task attempt $attemptNumber has
already failed.")
+ case Some(state) if attemptFailed(state, stageAttempt, partition,
attemptNumber) =>
+ logInfo(s"Commit denied for stage=$stage/$attemptNumber,
partition=$partition: " +
+ s"task attempt $attemptNumber already marked as failed.")
false
case Some(state) =>
- state.authorizedCommitters(partition) match {
- case NO_AUTHORIZED_COMMITTER =>
- logDebug(s"Authorizing attemptNumber=$attemptNumber to commit
for stage=$stage, " +
- s"partition=$partition")
- state.authorizedCommitters(partition) = attemptNumber
- true
- case existingCommitter =>
- // Coordinator should be idempotent when receiving
AskPermissionToCommit.
- if (existingCommitter == attemptNumber) {
- logWarning(s"Authorizing duplicate request to commit for " +
- s"attemptNumber=$attemptNumber to commit for
stage=$stage," +
- s" partition=$partition; existingCommitter =
$existingCommitter." +
- s" This can indicate dropped network traffic.")
- true
- } else {
- logDebug(s"Denying attemptNumber=$attemptNumber to commit
for stage=$stage, " +
- s"partition=$partition; existingCommitter =
$existingCommitter")
- false
- }
+ val existing = state.authorizedCommitters(partition)
+ if (existing == null) {
+ logDebug(s"Commit allowed for stage=$stage/$attemptNumber,
partition=$partition: " +
+ s"task attempt $attemptNumber")
+ state.authorizedCommitters(partition) =
TaskIdentifier(stageAttempt, attemptNumber)
+ true
+ } else {
+ logDebug(s"Commit denied for stage=$stage/$attemptNumber,
partition=$partition: " +
--- End diff --
would be nice to include the stage attempt in the log messages as well.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]