[ https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang resolved KAFKA-13249. ----------------------------------- Fix Version/s: 3.1.0 Resolution: Fixed > Checkpoints do not contain latest offsets on shutdown when using EOS > -------------------------------------------------------------------- > > Key: KAFKA-13249 > URL: https://issues.apache.org/jira/browse/KAFKA-13249 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.0.0, 2.7.0, 2.8.0 > Reporter: Oliver Hutchison > Assignee: Oliver Hutchison > Priority: Major > Fix For: 3.1.0 > > > When using EOS the {{.checkpoint}} file created when a stateful streams app > is shutdown does not always contain changelog offsets which represent the > latest state of the state store. The offsets can often be behind the end of > the changelog - sometimes quite significantly. > This leads to a state restore being required when the streams app restarts > after shutting down cleanly as streams thinks (based on the incorrect offsets > in the checkpoint) that the state store is not up to date with the changelog. > This is increasing the time we see it takes to do a clean restart of a single > instance streams app from around 10 second to sometime over 2 minutes in our > case. > I suspect the bug appears because an assumption about the {{commitNeeded}} > field in the following method in {{StreamTask}}: > {code:java} > protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { > // commitNeeded indicates we may have processed some records since last > commit > // and hence we need to refresh checkpointable offsets regardless whether > we should checkpoint or not > if (commitNeeded) { > stateMgr.updateChangelogOffsets(checkpointableOffsets()); > } > super.maybeWriteCheckpoint(enforceCheckpoint); > } > {code} > In a steady state case for a simple single instance single thread stream app > where an app simply starts, runs and then shuts down the {{if > (commitNeeded)}} test always fails when running with EOS which results in the > latest checkpoint offsets never getting updated into the {{stateMgr}}. > Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this > is the case as there's only 1 place in the code which calls > {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final > boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} > state. > {code:java} > case RUNNING: > if (enforceCheckpoint || !eosEnabled) { > maybeWriteCheckpoint(enforceCheckpoint); > } > log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", > state(), eosEnabled, enforceCheckpoint); > break; > {code} > We can see from this code that {{maybeWriteCheckpoint}} will only ever to > called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as > we're running with EOS. > So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? > Again looking only at the steady state case we find that it's only called > from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from > {{TaskManager.shutdown}}. > The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it > happens *after* all active tasks have commited. Which means that > {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test > back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the > latest offsets stored into the state manager. > I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to > be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always > update the changelog offserts before we write the checkpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)