Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20097#discussion_r159365350
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
---
@@ -357,7 +397,16 @@ class MicroBatchExecution(
s"DataFrame returned by getBatch from $source did not have
isStreaming=true\n" +
s"${batch.queryExecution.logical}")
logDebug(s"Retrieving data from $source: $current -> $available")
- Some(source -> batch)
+ Some(source -> batch.logicalPlan)
+ case (reader: MicroBatchReader, available)
+ if committedOffsets.get(reader).map(_ !=
available).getOrElse(true) =>
+ val current = committedOffsets.get(reader).map(off =>
reader.deserializeOffset(off.json))
+ reader.setOffsetRange(
+ Optional.ofNullable(current.orNull),
+
Optional.of(available.asInstanceOf[v2.streaming.reader.Offset]))
+ logDebug(s"Retrieving data from $reader: $current -> $available")
+ Some(reader ->
+ new
StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader))
--- End diff --
Maybe name this MicrobatchDataSourceV2Relation to be more specific?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]