Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21239#discussion_r186320417
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
---
@@ -123,15 +123,7 @@ class ContinuousExecution(
}
committedOffsets = nextOffsets.toStreamProgress(sources)
- // Get to an epoch ID that has definitely never been sent to a
sink before. Since sink
- // commit happens between offset log write and commit log write,
this means an epoch ID
- // which is not in the offset log.
- val (latestOffsetEpoch, _) = offsetLog.getLatest().getOrElse {
- throw new IllegalStateException(
- s"Offset log had no latest element. This shouldn't be possible
because nextOffsets is" +
- s"an element.")
- }
- currentBatchId = latestOffsetEpoch + 1
+ currentBatchId = latestEpochId + 1
--- End diff --
nit: remove the line above.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]