Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166360278 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { - val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) + val stageId = context.stageId() + val partId = context.partitionId() + val attemptId = context.attemptNumber() + val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { + val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- The API is flexible. The problem is that it defaults to no coordination, which cause correctness bugs. The safe option is to coordinate commits by default. If an implementation doesn't change the default, then it at least won't duplicate task outputs in job commit. Worst case is that it takes a little longer for committers that don't need coordination. On the other hand, not making this the default will cause some writers to work most of the time, but duplicate data in some cases. What do you think is the down side to adding this?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org