Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56908545
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
    @@ -52,13 +55,28 @@ class StreamExecution(
       /** Minimum amount of time in between the start of each batch. */
       private val minBatchTime = 10
     
    -  /** Tracks how much data we have processed from each input source. */
    -  private[sql] val streamProgress = new StreamProgress
    +  /**
    +   * Tracks how much data we have processed and committed to the sink or 
state store from each
    +   * input source.
    +   */
    +  private[sql] val committedOffsets = new StreamProgress
    +
    +  /**
    +   * Tracks the offsets that are available to be processed, but have not 
yet be committed to the
    +   * sink.
    +   */
    +  private[sql] val availableOffsets = new StreamProgress
    +
    +  /** The current batchId or -1 if execution has not yet been initialized. 
*/
    +  private[sql] var currentBatchId: Long = -1
     
       /** All stream sources present the query plan. */
       private val sources =
         logicalPlan.collect { case s: StreamingRelation => s.source }
     
    +  /** A list of unique sources in the query plan. */
    +  private val uniqueSources = sources.distinct
    --- End diff --
    
    Avoid calling possibly expensive operations like getMaxOffset multiple 
times if the same source exists more than one in a query.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to