Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r207978414
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
    @@ -122,24 +119,22 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
     
       override def toString: String = 
s"MemoryStream[${Utils.truncatedString(output, ",")}]"
     
    -  override def setOffsetRange(start: Optional[OffsetV2], end: 
Optional[OffsetV2]): Unit = {
    -    synchronized {
    -      startOffset = start.orElse(LongOffset(-1)).asInstanceOf[LongOffset]
    -      endOffset = end.orElse(currentOffset).asInstanceOf[LongOffset]
    -    }
    -  }
    -
       override def deserializeOffset(json: String): OffsetV2 = 
LongOffset(json.toLong)
     
    -  override def getStartOffset: OffsetV2 = synchronized {
    -    if (startOffset.offset == -1) null else startOffset
    +  override def initialOffset: OffsetV2 = LongOffset(-1)
    +
    +  override def latestOffset(start: OffsetV2): OffsetV2 = {
    +    if (currentOffset.offset == -1) null else currentOffset
    --- End diff --
    
    Yes, I agree. The V1 API didn't require sources to implement a "this is the 
beginning of the stream, read everything" offset, but that was a mistake we 
should make sure to remedy here.
    
    A followup PR makes sense, because there's some stream execution logic that 
can be greatly simplified when all sources have a real initial offset.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to