Github user frreiss commented on a diff in the pull request:
https://github.com/apache/spark/pull/14553#discussion_r84569335
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
---
@@ -337,17 +343,27 @@ class StreamExecution(
}
if (hasNewData) {
reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
- assert(
- offsetLog.add(currentBatchId,
availableOffsets.toCompositeOffset(sources)),
+ assert(offsetLog.add(currentBatchId,
availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected
for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId.")
+ // NOTE: The following code is correct because runBatches()
processes exactly one
+ // batch at a time. If we add pipeline parallelism (multiple
batches in flight at
+ // the same time), this cleanup logic will need to change.
+
+ // Now that we've updated the scheduler's persistent checkpoint,
it is safe for the
+ // sources to discard data from the previous batch.
+ val prevBatchOff = offsetLog.get(currentBatchId - 1)
+ if (prevBatchOff.isDefined) {
+ prevBatchOff.get.toStreamProgress(sources).foreach {
+ case (src, off) => src.commit(off)
+ }
+ }
+
// Now that we have logged the new batch, no further processing
will happen for
- // the previous batch, and it is safe to discard the old metadata.
- // Note that purge is exclusive, i.e. it purges everything before
currentBatchId.
- // NOTE: If StreamExecution implements pipeline parallelism
(multiple batches in
- // flight at the same time), this cleanup logic will need to
change.
- offsetLog.purge(currentBatchId)
+ // the batch before the previous batch, and it is safe to discard
the old metadata.
+ // Note that purge is exclusive, i.e. it purges everything before
the target ID.
+ offsetLog.purge(currentBatchId - 1)
--- End diff --
I can move this change to another JIRA if you'd like, but we really should
change `currentBatchId` to `currentBatchId - 1` at some point. The call to
`offsetLog.purge(currentBatchId)`, which I introduced in my PR for SPARK-17513,
contains a subtle bug. The recovery logic in `populateStartOffsets()` reads the
last and second-to-last entries in `offsetLog`. `populateStartOffsets()` uses
those entries to populate `availableOffsets` and `committedOffsets`,
respectively. Calling `offsetLog.purge(currentBatchId)` at line 350/366 results
in the `offsetLog` being truncated to one entry, which in turn results in
`committedOffsets` being left empty on recovery, which in turn causes the first
call to `getBatch()` for any source to have `None` as its first argument.
Sources that do not prune buffered data in their `commit()` methods will return
a previously committed data in response to such a `getBatch()` call.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]