Github user CodingCat commented on a diff in the pull request:
https://github.com/apache/spark/pull/19926#discussion_r156527709
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
---
@@ -447,296 +384,6 @@ class StreamExecution(
}
}
- /**
- * Populate the start offsets to start the execution at the current
offsets stored in the sink
- * (i.e. avoid reprocessing data that we have already processed). This
function must be called
- * before any processing occurs and will populate the following fields:
- * - currentBatchId
- * - committedOffsets
- * - availableOffsets
- * The basic structure of this method is as follows:
- *
- * Identify (from the offset log) the offsets used to run the last batch
- * IF last batch exists THEN
- * Set the next batch to be executed as the last recovered batch
- * Check the commit log to see which batch was committed last
- * IF the last batch was committed THEN
- * Call getBatch using the last batch start and end offsets
- * // ^^^^ above line is needed since some sources assume last
batch always re-executes
- * Setup for a new batch i.e., start = last batch end, and identify
new end
- * DONE
- * ELSE
- * Identify a brand new batch
- * DONE
- */
- private def populateStartOffsets(sparkSessionToRunBatches:
SparkSession): Unit = {
- offsetLog.getLatest() match {
--- End diff --
so, shall we also make them null here and let child classes override them?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]