Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r207976440
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
    @@ -122,24 +119,22 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
     
       override def toString: String = 
s"MemoryStream[${Utils.truncatedString(output, ",")}]"
     
    -  override def setOffsetRange(start: Optional[OffsetV2], end: 
Optional[OffsetV2]): Unit = {
    -    synchronized {
    -      startOffset = start.orElse(LongOffset(-1)).asInstanceOf[LongOffset]
    -      endOffset = end.orElse(currentOffset).asInstanceOf[LongOffset]
    -    }
    -  }
    -
       override def deserializeOffset(json: String): OffsetV2 = 
LongOffset(json.toLong)
     
    -  override def getStartOffset: OffsetV2 = synchronized {
    -    if (startOffset.offset == -1) null else startOffset
    +  override def initialOffset: OffsetV2 = LongOffset(-1)
    +
    +  override def latestOffset(start: OffsetV2): OffsetV2 = {
    +    if (currentOffset.offset == -1) null else currentOffset
    --- End diff --
    
    I feel it's more reasonable to forbid data source to return null offsets, 
and use `latestOffset != startOffset` at the streaming execution side to 
indicate if there is a new batch available. But I'll leave it to followup PR.
    
    cc @jose-torres 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to