Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20936#discussion_r182571570
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
    @@ -137,30 +137,65 @@ private[continuous] class EpochCoordinator(
       private val partitionOffsets =
         mutable.Map[(Long, Int), PartitionOffset]()
     
    +  private var lastCommittedEpoch = startEpoch - 1
    +  // Remembers epochs that have to wait for previous epochs to be 
committed first.
    +  private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long]
    --- End diff --
    
    This is orthogonal to the current PR, but I realized that both this and the 
commits/offsets maps are unbounded queues. We probably should introduce some 
SQLConf for the maximum epoch backlog, and report an error when too many stack 
up. I'll file a JIRA ticket for this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to