Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185961340
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
    @@ -266,93 +276,62 @@ class MicroBatchExecution(
       }
     
       /**
    -   * Queries all of the sources to see if any new data is available. When 
there is new data the
    -   * batchId counter is incremented and a new log entry is written with 
the newest offsets.
    +   * Attempts to construct the next batch based on whether new data is 
available and/or updated
    +   * metadata is such that another batch needs to be run for state clean 
up / additional output
    +   * generation even without new data. Returns true only if the next batch 
should be executed.
    +   *
    +   * Here is the high-level logic on how this constructs the next batch.
    +   * - Check each source whether new data is available
    +   * - Updated the query's metadata and check using the last execution 
whether there is any need
    +   *   to run another batch (for state clean up, etc.)
    +   * - If either of the above is true, then construct the next batch by 
committing to the offset
    +   *   log that range of offsets that the next batch will process.
        */
    -  private def constructNextBatch(): Unit = {
    -    // Check to see what new data is available.
    -    val hasNewData = {
    -      awaitProgressLock.lock()
    -      try {
    -        // Generate a map from each unique source to the next available 
offset.
    -        val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = 
uniqueSources.map {
    -          case s: Source =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("getOffset") {
    -              (s, s.getOffset)
    -            }
    -          case s: MicroBatchReader =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("setOffsetRange") {
    -              // Once v1 streaming source execution is gone, we can 
refactor this away.
    -              // For now, we set the range here to get the source to infer 
the available end offset,
    -              // get that offset, and then set the range again when we 
later execute.
    -              s.setOffsetRange(
    -                toJava(availableOffsets.get(s).map(off => 
s.deserializeOffset(off.json))),
    -                Optional.empty())
    -            }
    -
    -            val currentOffset = reportTimeTaken("getEndOffset") { 
s.getEndOffset() }
    -            (s, Option(currentOffset))
    -        }.toMap
    -        availableOffsets ++= latestOffsets.filter { case (_, o) => 
o.nonEmpty }.mapValues(_.get)
    -
    -        if (dataAvailable) {
    -          true
    -        } else {
    -          noNewData = true
    -          false
    +  private def constructNextBatch(): Boolean = withProgressLocked {
    +    // If new data is already available that means this method has already 
been called before
    +    // and it must have already committed the offset range of next batch 
to the offset log.
    +    // Hence do nothing, just return true.
    +    if (isNewDataAvailable) return true
    --- End diff --
    
    This condition is possible when restarting. If it finds that the last batch 
was planned but not completed, then there is new data is already available and 
committed to the offset log. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to