Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185890809
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
    @@ -266,93 +276,62 @@ class MicroBatchExecution(
       }
     
       /**
    -   * Queries all of the sources to see if any new data is available. When 
there is new data the
    -   * batchId counter is incremented and a new log entry is written with 
the newest offsets.
    +   * Attempts to construct the next batch based on whether new data is 
available and/or updated
    +   * metadata is such that another batch needs to be run for state clean 
up / additional output
    +   * generation even without new data. Returns true only if the next batch 
should be executed.
    +   *
    +   * Here is the high-level logic on how this constructs the next batch.
    +   * - Check each source whether new data is available
    +   * - Updated the query's metadata and check using the last execution 
whether there is any need
    +   *   to run another batch (for state clean up, etc.)
    +   * - If either of the above is true, then construct the next batch by 
committing to the offset
    +   *   log that range of offsets that the next batch will process.
        */
    -  private def constructNextBatch(): Unit = {
    -    // Check to see what new data is available.
    -    val hasNewData = {
    -      awaitProgressLock.lock()
    -      try {
    -        // Generate a map from each unique source to the next available 
offset.
    -        val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = 
uniqueSources.map {
    -          case s: Source =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("getOffset") {
    -              (s, s.getOffset)
    -            }
    -          case s: MicroBatchReader =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("setOffsetRange") {
    -              // Once v1 streaming source execution is gone, we can 
refactor this away.
    -              // For now, we set the range here to get the source to infer 
the available end offset,
    -              // get that offset, and then set the range again when we 
later execute.
    -              s.setOffsetRange(
    -                toJava(availableOffsets.get(s).map(off => 
s.deserializeOffset(off.json))),
    -                Optional.empty())
    -            }
    -
    -            val currentOffset = reportTimeTaken("getEndOffset") { 
s.getEndOffset() }
    -            (s, Option(currentOffset))
    -        }.toMap
    -        availableOffsets ++= latestOffsets.filter { case (_, o) => 
o.nonEmpty }.mapValues(_.get)
    -
    -        if (dataAvailable) {
    -          true
    -        } else {
    -          noNewData = true
    -          false
    +  private def constructNextBatch(): Boolean = withProgressLocked {
    +    // If new data is already available that means this method has already 
been called before
    +    // and it must have already committed the offset range of next batch 
to the offset log.
    +    // Hence do nothing, just return true.
    +    if (isNewDataAvailable) return true
    +
    +    // Generate a map from each unique source to the next available offset.
    +    val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = 
uniqueSources.map {
    +      case s: Source =>
    +        updateStatusMessage(s"Getting offsets from $s")
    +        reportTimeTaken("getOffset") {
    +          (s, s.getOffset)
             }
    -      } finally {
    -        awaitProgressLock.unlock()
    -      }
    -    }
    -    if (hasNewData) {
    -      var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
    -      // Update the eventTime watermarks if we find any in the plan.
    -      if (lastExecution != null) {
    -        lastExecution.executedPlan.collect {
    -          case e: EventTimeWatermarkExec => e
    -        }.zipWithIndex.foreach {
    -          case (e, index) if e.eventTimeStats.value.count > 0 =>
    -            logDebug(s"Observed event time stats $index: 
${e.eventTimeStats.value}")
    -            val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
    -            val prevWatermarkMs = watermarkMsMap.get(index)
    -            if (prevWatermarkMs.isEmpty || newWatermarkMs > 
prevWatermarkMs.get) {
    -              watermarkMsMap.put(index, newWatermarkMs)
    -            }
    -
    -          // Populate 0 if we haven't seen any data yet for this watermark 
node.
    -          case (_, index) =>
    -            if (!watermarkMsMap.isDefinedAt(index)) {
    -              watermarkMsMap.put(index, 0)
    -            }
    +      case s: MicroBatchReader =>
    +        updateStatusMessage(s"Getting offsets from $s")
    +        reportTimeTaken("setOffsetRange") {
    +          // Once v1 streaming source execution is gone, we can refactor 
this away.
    +          // For now, we set the range here to get the source to infer the 
available end offset,
    +          // get that offset, and then set the range again when we later 
execute.
    +          s.setOffsetRange(
    +            toJava(availableOffsets.get(s).map(off => 
s.deserializeOffset(off.json))),
    +            Optional.empty())
             }
     
    -        // Update the global watermark to the minimum of all watermark 
nodes.
    -        // This is the safest option, because only the global watermark is 
fault-tolerant. Making
    -        // it the minimum of all individual watermarks guarantees it will 
never advance past where
    -        // any individual watermark operator would be if it were in a plan 
by itself.
    -        if(!watermarkMsMap.isEmpty) {
    -          val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
    -          if (newWatermarkMs > batchWatermarkMs) {
    -            logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
    -            batchWatermarkMs = newWatermarkMs
    -          } else {
    -            logDebug(
    -              s"Event time didn't move: $newWatermarkMs < " +
    -                s"$batchWatermarkMs")
    -          }
    -        }
    -      }
    -      offsetSeqMetadata = offsetSeqMetadata.copy(
    -        batchWatermarkMs = batchWatermarkMs,
    -        batchTimestampMs = triggerClock.getTimeMillis()) // Current batch 
timestamp in milliseconds
    +        val currentOffset = reportTimeTaken("getEndOffset") { 
s.getEndOffset() }
    +        (s, Option(currentOffset))
    +    }.toMap
    +    availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty 
}.mapValues(_.get)
    +
    +    // Update the query metadata
    +    offsetSeqMetadata = offsetSeqMetadata.copy(
    +      batchWatermarkMs = watermarkTracker.currentWatermark,
    +      batchTimestampMs = triggerClock.getTimeMillis())
    +
    +    // Check whether next batch should be constructed
    +    val lastExecutionRequiresAnotherBatch =
    +      sparkSession.sessionState.conf.streamingNoDataMicroBatchesEnabled &&
    --- End diff --
    
    nit: do we need to use `sparkSessionForStream`? I guess not


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to