[ https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406988#comment-17406988 ]
A. Sophie Blee-Goldman commented on KAFKA-13249: ------------------------------------------------ Thanks for the detailed report! I agree with your analysis, it does appear that we can end up writing stale offsets if the thread is shut down immediately following a "normal" commit that occurs during active processing. And given the small commit interval typical of EOS applications, we almost always perform this kind of commit before breaking out of the StreamThread's processing loop, meaning most of the time we will indeed have just committed all tasks when we get to checking the shutdown signal and entering {{TaskManager.shutdown()}} I'll give your PR a pass -- thanks also for submitting a patch with the bug report :) > Checkpoints do not contain latest offsets on shutdown when using EOS > -------------------------------------------------------------------- > > Key: KAFKA-13249 > URL: https://issues.apache.org/jira/browse/KAFKA-13249 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.7.0, 2.8.0 > Reporter: Oliver Hutchison > Priority: Major > > When using EOS the {{.checkpoint}} file created when a stateful streams app > is shutdown does not always contain changelog offsets which represent the > latest state of the state store. The offsets can often be behind the end of > the changelog - sometimes quite significantly. > This leads to a state restore being required when the streams app restarts > after shutting down cleanly as streams thinks (based on the incorrect offsets > in the checkpoint) that the state store is not up to date with the changelog. > This is increasing the time we see it takes to do a clean restart of a single > instance streams app from around 10 second to sometime over 2 minutes in our > case. > I suspect the bug appears because an assumption about the {{commitNeeded}} > field in the following method in {{StreamTask}}: > {code:java} > protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { > // commitNeeded indicates we may have processed some records since last > commit > // and hence we need to refresh checkpointable offsets regardless whether > we should checkpoint or not > if (commitNeeded) { > stateMgr.updateChangelogOffsets(checkpointableOffsets()); > } > super.maybeWriteCheckpoint(enforceCheckpoint); > } > {code} > In a steady state case for a simple single instance single thread stream app > where an app simply starts, runs and then shuts down the {{if > (commitNeeded)}} test always fails when running with EOS which results in the > latest checkpoint offsets never getting updated into the {{stateMgr}}. > Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this > is the case as there's only 1 place in the code which calls > {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final > boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} > state. > {code:java} > case RUNNING: > if (enforceCheckpoint || !eosEnabled) { > maybeWriteCheckpoint(enforceCheckpoint); > } > log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", > state(), eosEnabled, enforceCheckpoint); > break; > {code} > We can see from this code that {{maybeWriteCheckpoint}} will only ever to > called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as > we're running with EOS. > So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? > Again looking only at the steady state case we find that it's only called > from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from > {{TaskManager.shutdown}}. > The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it > happens *after* all active tasks have commited. Which means that > {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test > back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the > latest offsets stored into the state manager. > I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to > be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always > update the changelog offserts before we write the checkpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)