Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167009143
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    --- End diff --
    
    Is it a good idea to add streaming to this commit?
    
    The changes differ significantly. It isn't clear how commit coordination 
happens for streaming writes. The OutputCommitCoordinator's `canCommit` method 
takes stage, partition, and attempt ids, not epochs. Either the other 
components aren't ready to have commit coordination, or I'm not familiar enough 
with how it is done for streaming.
    
    I think we can keep the two separate, and I'm happy to open a follow-up 
issue for the streaming side.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to