Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20936#discussion_r182571570
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
---
@@ -137,30 +137,65 @@ private[continuous] class EpochCoordinator(
private val partitionOffsets =
mutable.Map[(Long, Int), PartitionOffset]()
+ private var lastCommittedEpoch = startEpoch - 1
+ // Remembers epochs that have to wait for previous epochs to be
committed first.
+ private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long]
--- End diff --
This is orthogonal to the current PR, but I realized that both this and the
commits/offsets maps are unbounded queues. We probably should introduce some
SQLConf for the maximum epoch backlog, and report an error when too many stack
up. I'll file a JIRA ticket for this.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]