Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20386#discussion_r164680686
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
---
@@ -135,14 +142,21 @@ class MemoryStreamWriter(val sink: MemorySinkV2,
outputMode: OutputMode)
override def createWriterFactory: MemoryWriterFactory =
MemoryWriterFactory(outputMode)
- override def commit(epochId: Long, messages:
Array[WriterCommitMessage]): Unit = {
+ private val messages = new ArrayBuffer[WriterCommitMessage]()
+
+ override def add(message: WriterCommitMessage): Unit = synchronized {
+ messages += message
+ }
+
+ override def commit(epochId: Long): Unit = synchronized {
val newRows = messages.flatMap {
case message: MemoryWriterCommitMessage => message.data
- }
+ }.toArray
sink.write(epochId, outputMode, newRows)
+ messages.clear()
}
- override def abort(epochId: Long, messages: Array[WriterCommitMessage]):
Unit = {
+ override def abort(epochId: Long): Unit = {
--- End diff --
ditto
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]