Github user yanlin-Lynn commented on a diff in the pull request:
https://github.com/apache/spark/pull/21392#discussion_r190109933
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
---
@@ -153,9 +161,13 @@ private[continuous] class EpochCoordinator(
// If not, add the epoch being currently processed to epochs waiting
to be committed,
// otherwise commit it.
if (lastCommittedEpoch != epoch - 1) {
- logDebug(s"Epoch $epoch has received commits from all partitions "
+
- s"and is waiting for epoch ${epoch - 1} to be committed first.")
- epochsWaitingToBeCommitted.add(epoch)
+ if (epochsWaitingToBeCommitted.size == maxEpochBacklog) {
+ maxEpochBacklogExceeded = true
+ } else {
+ logDebug(s"Epoch $epoch has received commits from all partitions
" +
+ s"and is waiting for epoch ${epoch - 1} to be committed
first.")
+ epochsWaitingToBeCommitted.add(epoch)
--- End diff --
once maxEpochBacklogExceeded is set to true, can never set to false again?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]