Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/21392#discussion_r190112873
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
---
@@ -233,9 +235,15 @@ class ContinuousExecution(
}
false
} else if (isActive) {
- currentBatchId =
epochEndpoint.askSync[Long](IncrementAndGetEpoch)
- logInfo(s"New epoch $currentBatchId is starting.")
- true
+ val maxBacklogExceeded =
epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded)
+ if (maxBacklogExceeded) {
+ throw new IllegalStateException(
+ "Size of the epochs queue has exceeded maximum allowed
epoch backlog.")
--- End diff --
Agreed that the code as written won't shut down the stream. But I think it
does make sense to kill the stream rather than waiting for old epochs. If we
end up with a large backlog it's almost surely because some partition isn't
making any progress, so I wouldn't expect Spark to ever be able to catch up.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]