Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167386169
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
    @@ -116,21 +118,45 @@ object DataWritingSparkTask extends Logging {
       def run(
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
    -      iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
    +      iter: Iterator[InternalRow],
    +      useCommitCoordinator: Boolean): WriterCommitMessage = {
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    +        val commitAuthorized = coordinator.canCommit(context.stageId(), 
partId, attemptId)
    +        if (commitAuthorized) {
    +          logInfo(s"Writer for stage $stageId, task $partId.$attemptId is 
authorized to commit.")
    +          dataWriter.commit()
    +
    --- End diff --
    
    nit: remove unnecessary blank line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to