Github user tgravescs commented on a diff in the pull request:
https://github.com/apache/spark/pull/21606#discussion_r197290177
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
---
@@ -123,33 +121,34 @@ object DataWritingSparkTask extends Logging {
val msg = if (useCommitCoordinator) {
val coordinator = SparkEnv.get.outputCommitCoordinator
- val commitAuthorized = coordinator.canCommit(stageId,
stageAttempt, partId, attemptId)
+ val commitAuthorized = coordinator.canCommit(stageId,
stageAttempt, partId,
+ context.attemptNumber())
if (commitAuthorized) {
- logInfo(s"Writer for stage $stageId / $stageAttempt, " +
- s"task $partId.$attemptId is authorized to commit.")
+ logInfo(s"Writer for stage $stageId.$stageAttempt, " +
+ s"task $partId.$taskId is authorized to commit.")
dataWriter.commit()
} else {
- val message = s"Stage $stageId / $stageAttempt, " +
- s"task $partId.$attemptId: driver did not authorize commit"
+ val message = s"Stage $stageId.$stageAttempt, " +
+ s"task $partId.$taskId: driver did not authorize commit"
logInfo(message)
// throwing CommitDeniedException will trigger the catch block
for abort
- throw new CommitDeniedException(message, stageId, partId,
attemptId)
+ throw new CommitDeniedException(message, stageId, partId, taskId)
--- End diff --
I think these and above messages should be the attempt number to match the
output committer
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]