Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20097#discussion_r159552400
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
---
@@ -409,7 +475,12 @@ class MicroBatchExecution(
reportTimeTaken("addBatch") {
SQLExecution.withNewExecutionId(sparkSessionToRunBatch,
lastExecution) {
- sink.addBatch(currentBatchId, nextBatch)
+ sink match {
+ case s: Sink => s.addBatch(currentBatchId, nextBatch)
+ case s: MicroBatchWriteSupport =>
+ // Execute the V2 writer node in the query plan.
+ nextBatch.collect()
--- End diff --
Make it clear in the comments that this collect() does not accumulate any
data, only forces the execution of the writer.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]