Github user frreiss commented on a diff in the pull request: https://github.com/apache/spark/pull/14553#discussion_r84569335 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -337,17 +343,27 @@ class StreamExecution( } if (hasNewData) { reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { - assert( - offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), + assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") + // NOTE: The following code is correct because runBatches() processes exactly one + // batch at a time. If we add pipeline parallelism (multiple batches in flight at + // the same time), this cleanup logic will need to change. + + // Now that we've updated the scheduler's persistent checkpoint, it is safe for the + // sources to discard data from the previous batch. + val prevBatchOff = offsetLog.get(currentBatchId - 1) + if (prevBatchOff.isDefined) { + prevBatchOff.get.toStreamProgress(sources).foreach { + case (src, off) => src.commit(off) + } + } + // Now that we have logged the new batch, no further processing will happen for - // the previous batch, and it is safe to discard the old metadata. - // Note that purge is exclusive, i.e. it purges everything before currentBatchId. - // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in - // flight at the same time), this cleanup logic will need to change. - offsetLog.purge(currentBatchId) + // the batch before the previous batch, and it is safe to discard the old metadata. + // Note that purge is exclusive, i.e. it purges everything before the target ID. + offsetLog.purge(currentBatchId - 1) --- End diff -- I can move this change to another JIRA if you'd like, but we really should change `currentBatchId` to `currentBatchId - 1` at some point. The call to `offsetLog.purge(currentBatchId)`, which I introduced in my PR for SPARK-17513, contains a subtle bug. The recovery logic in `populateStartOffsets()` reads the last and second-to-last entries in `offsetLog`. `populateStartOffsets()` uses those entries to populate `availableOffsets` and `committedOffsets`, respectively. Calling `offsetLog.purge(currentBatchId)` at line 350/366 results in the `offsetLog` being truncated to one entry, which in turn results in `committedOffsets` being left empty on recovery, which in turn causes the first call to `getBatch()` for any source to have `None` as its first argument. Sources that do not prune buffered data in their `commit()` methods will return a previously committed data in response to such a `getBatch()` call.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org