Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/20490#discussion_r166448459
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
iter: Iterator[InternalRow]): WriterCommitMessage = {
- val dataWriter = writeTask.createDataWriter(context.partitionId(),
context.attemptNumber())
+ val stageId = context.stageId()
+ val partId = context.partitionId()
+ val attemptId = context.attemptNumber()
+ val dataWriter = writeTask.createDataWriter(partId, attemptId)
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
iter.foreach(dataWriter.write)
- logInfo(s"Writer for partition ${context.partitionId()} is
committing.")
- val msg = dataWriter.commit()
- logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+ val msg = if (writeTask.useCommitCoordinator) {
+ val coordinator = SparkEnv.get.outputCommitCoordinator
+ val commitAuthorized = coordinator.canCommit(context.stageId(),
partId, attemptId)
+ if (commitAuthorized) {
+ logInfo(s"Writer for stage $stageId, task $partId.$attemptId is
authorized to commit.")
+ dataWriter.commit()
+
+ } else {
+ val message = s"Stage $stageId, task $partId.$attemptId: driver
did not authorize commit"
+ logInfo(message)
+ // throwing CommitDeniedException will trigger the catch block
for abort
+ throw new CommitDeniedException(message, stageId, partId,
attemptId)
+ }
+
+ } else {
+ logInfo(s"Writer for partition ${context.partitionId()} is
committing.")
+ dataWriter.commit()
+ }
+
+ logInfo(s"Writer for stage $stageId, task $partId.$attemptId
committed.")
--- End diff --
It's implicitly done in the logs anyway, but I've found tracking the
duration of these operations useful
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]