Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19926#discussion_r156470973
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
    @@ -447,296 +384,6 @@ class StreamExecution(
         }
       }
     
    -  /**
    -   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
    -   * (i.e. avoid reprocessing data that we have already processed). This 
function must be called
    -   * before any processing occurs and will populate the following fields:
    -   *  - currentBatchId
    -   *  - committedOffsets
    -   *  - availableOffsets
    -   *  The basic structure of this method is as follows:
    -   *
    -   *  Identify (from the offset log) the offsets used to run the last batch
    -   *  IF last batch exists THEN
    -   *    Set the next batch to be executed as the last recovered batch
    -   *    Check the commit log to see which batch was committed last
    -   *    IF the last batch was committed THEN
    -   *      Call getBatch using the last batch start and end offsets
    -   *      // ^^^^ above line is needed since some sources assume last 
batch always re-executes
    -   *      Setup for a new batch i.e., start = last batch end, and identify 
new end
    -   *    DONE
    -   *  ELSE
    -   *    Identify a brand new batch
    -   *  DONE
    -   */
    -  private def populateStartOffsets(sparkSessionToRunBatches: 
SparkSession): Unit = {
    -    offsetLog.getLatest() match {
    --- End diff --
    
    The offset log right now has a strict schema that commit information 
wouldn't fit in. I was planning to keep both logs in the continuous 
implementation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to