Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20097#discussion_r159961186
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
    @@ -357,31 +400,39 @@ class MicroBatchExecution(
                 s"DataFrame returned by getBatch from $source did not have 
isStreaming=true\n" +
                   s"${batch.queryExecution.logical}")
               logDebug(s"Retrieving data from $source: $current -> $available")
    -          Some(source -> batch)
    +          Some(source -> batch.logicalPlan)
    +        case (reader: MicroBatchReader, available)
    +          if committedOffsets.get(reader).map(_ != 
available).getOrElse(true) =>
    +          val current = committedOffsets.get(reader).map(off => 
reader.deserializeOffset(off.json))
    +          reader.setOffsetRange(
    +            toJava(current),
    +            
Optional.of(available.asInstanceOf[v2.streaming.reader.Offset]))
    --- End diff --
    
    `v2.streaming.reader.Offset` is being used in a lot of places. Please 
rename it to OffsetV2 in the imports and use that in all places.
      


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to