Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22009#discussion_r207976440
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
---
@@ -122,24 +119,22 @@ case class MemoryStream[A : Encoder](id: Int,
sqlContext: SQLContext)
override def toString: String =
s"MemoryStream[${Utils.truncatedString(output, ",")}]"
- override def setOffsetRange(start: Optional[OffsetV2], end:
Optional[OffsetV2]): Unit = {
- synchronized {
- startOffset = start.orElse(LongOffset(-1)).asInstanceOf[LongOffset]
- endOffset = end.orElse(currentOffset).asInstanceOf[LongOffset]
- }
- }
-
override def deserializeOffset(json: String): OffsetV2 =
LongOffset(json.toLong)
- override def getStartOffset: OffsetV2 = synchronized {
- if (startOffset.offset == -1) null else startOffset
+ override def initialOffset: OffsetV2 = LongOffset(-1)
+
+ override def latestOffset(start: OffsetV2): OffsetV2 = {
+ if (currentOffset.offset == -1) null else currentOffset
--- End diff --
I feel it's more reasonable to forbid data source to return null offsets,
and use `latestOffset != startOffset` at the streaming execution side to
indicate if there is a new batch available. But I'll leave it to followup PR.
cc @jose-torres
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]