Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/21220#discussion_r185887201
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
---
@@ -266,93 +276,62 @@ class MicroBatchExecution(
}
/**
- * Queries all of the sources to see if any new data is available. When
there is new data the
- * batchId counter is incremented and a new log entry is written with
the newest offsets.
+ * Attempts to construct the next batch based on whether new data is
available and/or updated
+ * metadata is such that another batch needs to be run for state clean
up / additional output
+ * generation even without new data. Returns true only if the next batch
should be executed.
+ *
+ * Here is the high-level logic on how this constructs the next batch.
+ * - Check each source whether new data is available
+ * - Updated the query's metadata and check using the last execution
whether there is any need
+ * to run another batch (for state clean up, etc.)
+ * - If either of the above is true, then construct the next batch by
committing to the offset
+ * log that range of offsets that the next batch will process.
*/
- private def constructNextBatch(): Unit = {
- // Check to see what new data is available.
- val hasNewData = {
- awaitProgressLock.lock()
- try {
- // Generate a map from each unique source to the next available
offset.
- val latestOffsets: Map[BaseStreamingSource, Option[Offset]] =
uniqueSources.map {
- case s: Source =>
- updateStatusMessage(s"Getting offsets from $s")
- reportTimeTaken("getOffset") {
- (s, s.getOffset)
- }
- case s: MicroBatchReader =>
- updateStatusMessage(s"Getting offsets from $s")
- reportTimeTaken("setOffsetRange") {
- // Once v1 streaming source execution is gone, we can
refactor this away.
- // For now, we set the range here to get the source to infer
the available end offset,
- // get that offset, and then set the range again when we
later execute.
- s.setOffsetRange(
- toJava(availableOffsets.get(s).map(off =>
s.deserializeOffset(off.json))),
- Optional.empty())
- }
-
- val currentOffset = reportTimeTaken("getEndOffset") {
s.getEndOffset() }
- (s, Option(currentOffset))
- }.toMap
- availableOffsets ++= latestOffsets.filter { case (_, o) =>
o.nonEmpty }.mapValues(_.get)
-
- if (dataAvailable) {
- true
- } else {
- noNewData = true
- false
+ private def constructNextBatch(): Boolean = withProgressLocked {
+ // If new data is already available that means this method has already
been called before
+ // and it must have already committed the offset range of next batch
to the offset log.
+ // Hence do nothing, just return true.
+ if (isNewDataAvailable) return true
--- End diff --
how is this possible?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]