Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164680686
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
 ---
    @@ -135,14 +142,21 @@ class MemoryStreamWriter(val sink: MemorySinkV2, 
outputMode: OutputMode)
     
       override def createWriterFactory: MemoryWriterFactory = 
MemoryWriterFactory(outputMode)
     
    -  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {
    +  private val messages = new ArrayBuffer[WriterCommitMessage]()
    +
    +  override def add(message: WriterCommitMessage): Unit = synchronized {
    +    messages += message
    +  }
    +
    +  override def commit(epochId: Long): Unit = synchronized {
         val newRows = messages.flatMap {
           case message: MemoryWriterCommitMessage => message.data
    -    }
    +    }.toArray
         sink.write(epochId, outputMode, newRows)
    +    messages.clear()
       }
     
    -  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {
    +  override def abort(epochId: Long): Unit = {
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to