Github user frreiss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14553#discussion_r84569335
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
    @@ -337,17 +343,27 @@ class StreamExecution(
         }
         if (hasNewData) {
           reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
    -        assert(
    -          offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
    +        assert(offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
               s"Concurrent update to the log. Multiple streaming jobs detected 
for $currentBatchId")
             logInfo(s"Committed offsets for batch $currentBatchId.")
     
    +        // NOTE: The following code is correct because runBatches() 
processes exactly one
    +        // batch at a time. If we add pipeline parallelism (multiple 
batches in flight at
    +        // the same time), this cleanup logic will need to change.
    +
    +        // Now that we've updated the scheduler's persistent checkpoint, 
it is safe for the
    +        // sources to discard data from the previous batch.
    +        val prevBatchOff = offsetLog.get(currentBatchId - 1)
    +        if (prevBatchOff.isDefined) {
    +          prevBatchOff.get.toStreamProgress(sources).foreach {
    +            case (src, off) => src.commit(off)
    +          }
    +        }
    +
             // Now that we have logged the new batch, no further processing 
will happen for
    -        // the previous batch, and it is safe to discard the old metadata.
    -        // Note that purge is exclusive, i.e. it purges everything before 
currentBatchId.
    -        // NOTE: If StreamExecution implements pipeline parallelism 
(multiple batches in
    -        // flight at the same time), this cleanup logic will need to 
change.
    -        offsetLog.purge(currentBatchId)
    +        // the batch before the previous batch, and it is safe to discard 
the old metadata.
    +        // Note that purge is exclusive, i.e. it purges everything before 
the target ID.
    +        offsetLog.purge(currentBatchId - 1)
    --- End diff --
    
    I can move this change to another JIRA if you'd like, but we really should 
change `currentBatchId` to `currentBatchId - 1` at some point. The call to 
`offsetLog.purge(currentBatchId)`, which I introduced in my PR for SPARK-17513, 
contains a subtle bug. The recovery logic in `populateStartOffsets()` reads the 
last and second-to-last entries in `offsetLog`. `populateStartOffsets()` uses 
those entries to populate `availableOffsets` and `committedOffsets`, 
respectively. Calling `offsetLog.purge(currentBatchId)` at line 350/366 results 
in the `offsetLog` being truncated to one entry, which in turn results in 
`committedOffsets` being left empty on recovery, which in turn causes the first 
call to `getBatch()` for any source to have `None` as its first argument. 
Sources that do not prune buffered data in their `commit()` methods will return 
a previously committed data in response to such a `getBatch()` call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to