Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21239#discussion_r186320417
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
    @@ -123,15 +123,7 @@ class ContinuousExecution(
             }
             committedOffsets = nextOffsets.toStreamProgress(sources)
     
    -        // Get to an epoch ID that has definitely never been sent to a 
sink before. Since sink
    -        // commit happens between offset log write and commit log write, 
this means an epoch ID
    -        // which is not in the offset log.
    -        val (latestOffsetEpoch, _) = offsetLog.getLatest().getOrElse {
    -          throw new IllegalStateException(
    -            s"Offset log had no latest element. This shouldn't be possible 
because nextOffsets is" +
    -              s"an element.")
    -        }
    -        currentBatchId = latestOffsetEpoch + 1
    +        currentBatchId = latestEpochId + 1
    --- End diff --
    
    nit: remove the line above.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to