Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/21220#discussion_r185890809
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
---
@@ -266,93 +276,62 @@ class MicroBatchExecution(
}
/**
- * Queries all of the sources to see if any new data is available. When
there is new data the
- * batchId counter is incremented and a new log entry is written with
the newest offsets.
+ * Attempts to construct the next batch based on whether new data is
available and/or updated
+ * metadata is such that another batch needs to be run for state clean
up / additional output
+ * generation even without new data. Returns true only if the next batch
should be executed.
+ *
+ * Here is the high-level logic on how this constructs the next batch.
+ * - Check each source whether new data is available
+ * - Updated the query's metadata and check using the last execution
whether there is any need
+ * to run another batch (for state clean up, etc.)
+ * - If either of the above is true, then construct the next batch by
committing to the offset
+ * log that range of offsets that the next batch will process.
*/
- private def constructNextBatch(): Unit = {
- // Check to see what new data is available.
- val hasNewData = {
- awaitProgressLock.lock()
- try {
- // Generate a map from each unique source to the next available
offset.
- val latestOffsets: Map[BaseStreamingSource, Option[Offset]] =
uniqueSources.map {
- case s: Source =>
- updateStatusMessage(s"Getting offsets from $s")
- reportTimeTaken("getOffset") {
- (s, s.getOffset)
- }
- case s: MicroBatchReader =>
- updateStatusMessage(s"Getting offsets from $s")
- reportTimeTaken("setOffsetRange") {
- // Once v1 streaming source execution is gone, we can
refactor this away.
- // For now, we set the range here to get the source to infer
the available end offset,
- // get that offset, and then set the range again when we
later execute.
- s.setOffsetRange(
- toJava(availableOffsets.get(s).map(off =>
s.deserializeOffset(off.json))),
- Optional.empty())
- }
-
- val currentOffset = reportTimeTaken("getEndOffset") {
s.getEndOffset() }
- (s, Option(currentOffset))
- }.toMap
- availableOffsets ++= latestOffsets.filter { case (_, o) =>
o.nonEmpty }.mapValues(_.get)
-
- if (dataAvailable) {
- true
- } else {
- noNewData = true
- false
+ private def constructNextBatch(): Boolean = withProgressLocked {
+ // If new data is already available that means this method has already
been called before
+ // and it must have already committed the offset range of next batch
to the offset log.
+ // Hence do nothing, just return true.
+ if (isNewDataAvailable) return true
+
+ // Generate a map from each unique source to the next available offset.
+ val latestOffsets: Map[BaseStreamingSource, Option[Offset]] =
uniqueSources.map {
+ case s: Source =>
+ updateStatusMessage(s"Getting offsets from $s")
+ reportTimeTaken("getOffset") {
+ (s, s.getOffset)
}
- } finally {
- awaitProgressLock.unlock()
- }
- }
- if (hasNewData) {
- var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
- // Update the eventTime watermarks if we find any in the plan.
- if (lastExecution != null) {
- lastExecution.executedPlan.collect {
- case e: EventTimeWatermarkExec => e
- }.zipWithIndex.foreach {
- case (e, index) if e.eventTimeStats.value.count > 0 =>
- logDebug(s"Observed event time stats $index:
${e.eventTimeStats.value}")
- val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
- val prevWatermarkMs = watermarkMsMap.get(index)
- if (prevWatermarkMs.isEmpty || newWatermarkMs >
prevWatermarkMs.get) {
- watermarkMsMap.put(index, newWatermarkMs)
- }
-
- // Populate 0 if we haven't seen any data yet for this watermark
node.
- case (_, index) =>
- if (!watermarkMsMap.isDefinedAt(index)) {
- watermarkMsMap.put(index, 0)
- }
+ case s: MicroBatchReader =>
+ updateStatusMessage(s"Getting offsets from $s")
+ reportTimeTaken("setOffsetRange") {
+ // Once v1 streaming source execution is gone, we can refactor
this away.
+ // For now, we set the range here to get the source to infer the
available end offset,
+ // get that offset, and then set the range again when we later
execute.
+ s.setOffsetRange(
+ toJava(availableOffsets.get(s).map(off =>
s.deserializeOffset(off.json))),
+ Optional.empty())
}
- // Update the global watermark to the minimum of all watermark
nodes.
- // This is the safest option, because only the global watermark is
fault-tolerant. Making
- // it the minimum of all individual watermarks guarantees it will
never advance past where
- // any individual watermark operator would be if it were in a plan
by itself.
- if(!watermarkMsMap.isEmpty) {
- val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
- if (newWatermarkMs > batchWatermarkMs) {
- logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
- batchWatermarkMs = newWatermarkMs
- } else {
- logDebug(
- s"Event time didn't move: $newWatermarkMs < " +
- s"$batchWatermarkMs")
- }
- }
- }
- offsetSeqMetadata = offsetSeqMetadata.copy(
- batchWatermarkMs = batchWatermarkMs,
- batchTimestampMs = triggerClock.getTimeMillis()) // Current batch
timestamp in milliseconds
+ val currentOffset = reportTimeTaken("getEndOffset") {
s.getEndOffset() }
+ (s, Option(currentOffset))
+ }.toMap
+ availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty
}.mapValues(_.get)
+
+ // Update the query metadata
+ offsetSeqMetadata = offsetSeqMetadata.copy(
+ batchWatermarkMs = watermarkTracker.currentWatermark,
+ batchTimestampMs = triggerClock.getTimeMillis())
+
+ // Check whether next batch should be constructed
+ val lastExecutionRequiresAnotherBatch =
+ sparkSession.sessionState.conf.streamingNoDataMicroBatchesEnabled &&
--- End diff --
nit: do we need to use `sparkSessionForStream`? I guess not
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]