Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13286#discussion_r64495984
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
    @@ -114,35 +114,48 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
      * A sink that stores the results in memory. This [[Sink]] is primarily 
intended for use in unit
      * tests and does not provide durability.
      */
    -class MemorySink(val schema: StructType) extends Sink with Logging {
    +class MemorySink(val schema: StructType, outputMode: OutputMode) extends 
Sink with Logging {
    +
    +  private case class AddedData(batchId: Long, data: Array[Row])
    +
       /** An order list of batches that have been written to this [[Sink]]. */
       @GuardedBy("this")
    -  private val batches = new ArrayBuffer[Array[Row]]()
    +  private val batches = new ArrayBuffer[AddedData]()
     
       /** Returns all rows that are stored in this [[Sink]]. */
       def allData: Seq[Row] = synchronized {
    -    batches.flatten
    +    batches.map(_.data).flatten
       }
     
    -  def latestBatchId: Option[Int] = synchronized {
    -    if (batches.size == 0) None else Some(batches.size - 1)
    +  def latestBatchId: Option[Long] = synchronized {
    +    batches.lastOption.map(_.batchId)
       }
     
    -  def lastBatch: Seq[Row] = synchronized { batches.last }
    +  def latestBatchData: Seq[Row] = synchronized { 
batches.lastOption.toSeq.flatten(_.data) }
     
       def toDebugString: String = synchronized {
    -    batches.zipWithIndex.map { case (b, i) =>
    -      val dataStr = try b.mkString(" ") catch {
    +    batches.map { case AddedData(batchId, data) =>
    +      val dataStr = try data.mkString(" ") catch {
             case NonFatal(e) => "[Error converting to string]"
           }
    -      s"$i: $dataStr"
    +      s"$batchId: $dataStr"
         }.mkString("\n")
       }
     
       override def addBatch(batchId: Long, data: DataFrame): Unit = 
synchronized {
    -    if (batchId == batches.size) {
    -      logDebug(s"Committing batch $batchId")
    -      batches.append(data.collect())
    +    if (latestBatchId.isEmpty || batchId > latestBatchId.get) {
    +      logDebug(s"Committing batch $batchId to $this")
    +      outputMode match {
    +        case OutputMode.Append | OutputMode.Update =>
    +          batches.append(AddedData(batchId, data.collect()))
    +
    +        case OutputMode.Complete =>
    +          batches.clear()
    +          batches.append(AddedData(batchId, data.collect()))
    +
    +        case _ =>
    +          throw new IllegalArgumentException("Data source ")
    --- End diff --
    
    thats a mistake, i left it incomplete. thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to