Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21220#discussion_r185961340
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
---
@@ -266,93 +276,62 @@ class MicroBatchExecution(
}
/**
- * Queries all of the sources to see if any new data is available. When
there is new data the
- * batchId counter is incremented and a new log entry is written with
the newest offsets.
+ * Attempts to construct the next batch based on whether new data is
available and/or updated
+ * metadata is such that another batch needs to be run for state clean
up / additional output
+ * generation even without new data. Returns true only if the next batch
should be executed.
+ *
+ * Here is the high-level logic on how this constructs the next batch.
+ * - Check each source whether new data is available
+ * - Updated the query's metadata and check using the last execution
whether there is any need
+ * to run another batch (for state clean up, etc.)
+ * - If either of the above is true, then construct the next batch by
committing to the offset
+ * log that range of offsets that the next batch will process.
*/
- private def constructNextBatch(): Unit = {
- // Check to see what new data is available.
- val hasNewData = {
- awaitProgressLock.lock()
- try {
- // Generate a map from each unique source to the next available
offset.
- val latestOffsets: Map[BaseStreamingSource, Option[Offset]] =
uniqueSources.map {
- case s: Source =>
- updateStatusMessage(s"Getting offsets from $s")
- reportTimeTaken("getOffset") {
- (s, s.getOffset)
- }
- case s: MicroBatchReader =>
- updateStatusMessage(s"Getting offsets from $s")
- reportTimeTaken("setOffsetRange") {
- // Once v1 streaming source execution is gone, we can
refactor this away.
- // For now, we set the range here to get the source to infer
the available end offset,
- // get that offset, and then set the range again when we
later execute.
- s.setOffsetRange(
- toJava(availableOffsets.get(s).map(off =>
s.deserializeOffset(off.json))),
- Optional.empty())
- }
-
- val currentOffset = reportTimeTaken("getEndOffset") {
s.getEndOffset() }
- (s, Option(currentOffset))
- }.toMap
- availableOffsets ++= latestOffsets.filter { case (_, o) =>
o.nonEmpty }.mapValues(_.get)
-
- if (dataAvailable) {
- true
- } else {
- noNewData = true
- false
+ private def constructNextBatch(): Boolean = withProgressLocked {
+ // If new data is already available that means this method has already
been called before
+ // and it must have already committed the offset range of next batch
to the offset log.
+ // Hence do nothing, just return true.
+ if (isNewDataAvailable) return true
--- End diff --
This condition is possible when restarting. If it finds that the last batch
was planned but not completed, then there is new data is already available and
committed to the offset log.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]