Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/22009#discussion_r207978414
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
---
@@ -122,24 +119,22 @@ case class MemoryStream[A : Encoder](id: Int,
sqlContext: SQLContext)
override def toString: String =
s"MemoryStream[${Utils.truncatedString(output, ",")}]"
- override def setOffsetRange(start: Optional[OffsetV2], end:
Optional[OffsetV2]): Unit = {
- synchronized {
- startOffset = start.orElse(LongOffset(-1)).asInstanceOf[LongOffset]
- endOffset = end.orElse(currentOffset).asInstanceOf[LongOffset]
- }
- }
-
override def deserializeOffset(json: String): OffsetV2 =
LongOffset(json.toLong)
- override def getStartOffset: OffsetV2 = synchronized {
- if (startOffset.offset == -1) null else startOffset
+ override def initialOffset: OffsetV2 = LongOffset(-1)
+
+ override def latestOffset(start: OffsetV2): OffsetV2 = {
+ if (currentOffset.offset == -1) null else currentOffset
--- End diff --
Yes, I agree. The V1 API didn't require sources to implement a "this is the
beginning of the stream, read everything" offset, but that was a mistake we
should make sure to remedy here.
A followup PR makes sense, because there's some stream execution logic that
can be greatly simplified when all sources have a real initial offset.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]