Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21577#discussion_r196246700
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -131,16 +139,17 @@ private[spark] class OutputCommitCoordinator(conf:
SparkConf, isDriver: Boolean)
reason match {
case Success =>
// The task output has been committed successfully
- case denied: TaskCommitDenied =>
- logInfo(s"Task was denied committing, stage: $stage, partition:
$partition, " +
- s"attempt: $attemptNumber")
- case otherReason =>
+ case _: TaskCommitDenied =>
+ logInfo(s"Task was denied committing, stage: $stage /
$stageAttempt, " +
+ s"partition: $partition, attempt: $attemptNumber")
+ case _ =>
// Mark the attempt as failed to blacklist from future commit
protocol
- stageState.failures.getOrElseUpdate(partition, mutable.Set()) +=
attemptNumber
- if (stageState.authorizedCommitters(partition) == attemptNumber) {
+ val taskId = TaskIdentifier(stageAttempt, attemptNumber)
+ stageState.failures.getOrElseUpdate(partition, mutable.Set()) +=
taskId
+ if (stageState.authorizedCommitters(partition) == taskId) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber,
stage=$stage, " +
s"partition=$partition) failed; clearing lock")
- stageState.authorizedCommitters(partition) =
NO_AUTHORIZED_COMMITTER
+ stageState.authorizedCommitters(partition) = null
--- End diff --
Less memory usage, at least. Not sure what advantage using `Option` would
bring here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]