Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20386#discussion_r164680538
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
---
@@ -39,13 +41,20 @@ class ConsoleWriter(schema: StructType, options:
DataSourceV2Options)
def createWriterFactory(): DataWriterFactory[Row] =
PackedRowWriterFactory
- override def commit(epochId: Long, messages:
Array[WriterCommitMessage]): Unit = {
+ private val messages = new ArrayBuffer[WriterCommitMessage]()
+
+ override def add(message: WriterCommitMessage): Unit = synchronized {
+ messages += message
+ }
+
+ override def commit(epochId: Long): Unit = synchronized {
// We have to print a "Batch" label for the epoch for compatibility
with the pre-data source V2
// behavior.
- printRows(messages, schema, s"Batch: $epochId")
+ printRows(messages.toArray, schema, s"Batch: $epochId")
+ messages.clear()
}
- def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+ def abort(epochId: Long): Unit = {}
--- End diff --
we should clear the message array in abort too.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]