Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158361942
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
    @@ -109,6 +125,42 @@ object DataWritingSparkTask extends Logging {
           logError(s"Writer for partition ${context.partitionId()} aborted.")
         })
       }
    +
    +  def runContinuous(
    +      writeTask: DataWriterFactory[InternalRow],
    +      context: TaskContext,
    +      iter: Iterator[InternalRow]): WriterCommitMessage = {
    +    val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
    +    val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
    +    val currentMsg: WriterCommitMessage = null
    +    var currentEpoch = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +    do {
    +      // write the data and commit this writer.
    +      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
    +        try {
    +          iter.foreach(dataWriter.write)
    +          logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
    +          val msg = dataWriter.commit()
    +          logInfo(s"Writer for partition ${context.partitionId()} 
committed.")
    +          EpochCoordinatorRef.get(runId, SparkEnv.get).send(
    --- End diff --
    
    nit: `EpochCoordinatorRef.get` is not cheap. You can store it outside the 
loop.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to