Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20097#discussion_r159552400
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
    @@ -409,7 +475,12 @@ class MicroBatchExecution(
     
         reportTimeTaken("addBatch") {
           SQLExecution.withNewExecutionId(sparkSessionToRunBatch, 
lastExecution) {
    -        sink.addBatch(currentBatchId, nextBatch)
    +        sink match {
    +          case s: Sink => s.addBatch(currentBatchId, nextBatch)
    +          case s: MicroBatchWriteSupport =>
    +            // Execute the V2 writer node in the query plan.
    +            nextBatch.collect()
    --- End diff --
    
    Make it clear in the comments that this collect() does not accumulate any 
data, only forces the execution of the writer.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to