Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14553#discussion_r84549552
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
    @@ -337,17 +343,27 @@ class StreamExecution(
         }
         if (hasNewData) {
           reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
    -        assert(
    -          offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
    +        assert(offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
               s"Concurrent update to the log. Multiple streaming jobs detected 
for $currentBatchId")
             logInfo(s"Committed offsets for batch $currentBatchId.")
     
    +        // NOTE: The following code is correct because runBatches() 
processes exactly one
    +        // batch at a time. If we add pipeline parallelism (multiple 
batches in flight at
    +        // the same time), this cleanup logic will need to change.
    +
    +        // Now that we've updated the scheduler's persistent checkpoint, 
it is safe for the
    +        // sources to discard data from the previous batch.
    +        val prevBatchOff = offsetLog.get(currentBatchId - 1)
    +        if (prevBatchOff.isDefined) {
    +          prevBatchOff.get.toStreamProgress(sources).foreach {
    +            case (src, off) => src.commit(off)
    +          }
    +        }
    +
             // Now that we have logged the new batch, no further processing 
will happen for
    -        // the previous batch, and it is safe to discard the old metadata.
    -        // Note that purge is exclusive, i.e. it purges everything before 
currentBatchId.
    -        // NOTE: If StreamExecution implements pipeline parallelism 
(multiple batches in
    -        // flight at the same time), this cleanup logic will need to 
change.
    -        offsetLog.purge(currentBatchId)
    +        // the batch before the previous batch, and it is safe to discard 
the old metadata.
    +        // Note that purge is exclusive, i.e. it purges everything before 
the target ID.
    +        offsetLog.purge(currentBatchId - 1)
    --- End diff --
    
    nit: this can be `offsetLog.purge(currentBatchId)`, it's exclusive, then 
you can revert changes to StreamingQuerySuite.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to