Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156470973 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -447,296 +384,6 @@ class StreamExecution( } } - /** - * Populate the start offsets to start the execution at the current offsets stored in the sink - * (i.e. avoid reprocessing data that we have already processed). This function must be called - * before any processing occurs and will populate the following fields: - * - currentBatchId - * - committedOffsets - * - availableOffsets - * The basic structure of this method is as follows: - * - * Identify (from the offset log) the offsets used to run the last batch - * IF last batch exists THEN - * Set the next batch to be executed as the last recovered batch - * Check the commit log to see which batch was committed last - * IF the last batch was committed THEN - * Call getBatch using the last batch start and end offsets - * // ^^^^ above line is needed since some sources assume last batch always re-executes - * Setup for a new batch i.e., start = last batch end, and identify new end - * DONE - * ELSE - * Identify a brand new batch - * DONE - */ - private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { - offsetLog.getLatest() match { --- End diff -- The offset log right now has a strict schema that commit information wouldn't fit in. I was planning to keep both logs in the continuous implementation.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org