[ 
https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13249.
-----------------------------------
    Fix Version/s: 3.1.0
       Resolution: Fixed

> Checkpoints do not contain latest offsets on shutdown when using EOS
> --------------------------------------------------------------------
>
>                 Key: KAFKA-13249
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13249
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.0.0, 2.7.0, 2.8.0
>            Reporter: Oliver Hutchison
>            Assignee: Oliver Hutchison
>            Priority: Major
>             Fix For: 3.1.0
>
>
> When using EOS the {{.checkpoint}} file created when a stateful streams app 
> is shutdown does not always contain changelog offsets which represent the 
> latest state of the state store. The offsets can often be behind the end of 
> the changelog - sometimes quite significantly.
> This leads to a state restore being required when the streams app restarts 
> after shutting down cleanly as streams thinks (based on the incorrect offsets 
> in the checkpoint) that the state store is not up to date with the changelog. 
> This is increasing the time we see it takes to do a clean restart of a single 
> instance streams app from around 10 second to sometime over 2 minutes in our 
> case.
> I suspect the bug appears because an assumption about the {{commitNeeded}} 
> field in the following method in {{StreamTask}}:
> {code:java}
> protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
>   // commitNeeded indicates we may have processed some records since last 
> commit
>   // and hence we need to refresh checkpointable offsets regardless whether 
> we should checkpoint or not
>   if (commitNeeded) {
>     stateMgr.updateChangelogOffsets(checkpointableOffsets());
>   }
>   super.maybeWriteCheckpoint(enforceCheckpoint);
> }
> {code}
> In a steady state case for a simple single instance single thread stream app 
> where an app simply starts, runs and then shuts down the {{if 
> (commitNeeded)}} test always fails when running with EOS which results in the 
> latest checkpoint offsets never getting updated into the {{stateMgr}}.
> Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this 
> is the case as there's only 1 place in the code which calls 
> {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final 
> boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} 
> state.
> {code:java}
> case RUNNING:
>   if (enforceCheckpoint || !eosEnabled) {
>     maybeWriteCheckpoint(enforceCheckpoint);
>   }
>   log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", 
> state(), eosEnabled, enforceCheckpoint);
>   break;
> {code}
> We can see from this code that {{maybeWriteCheckpoint}} will only ever to 
> called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as 
> we're running with EOS.
> So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? 
> Again looking only at the steady state case we find that it's only called 
> from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from 
> {{TaskManager.shutdown}}.
> The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it 
> happens *after* all active tasks have commited. Which means that 
> {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test 
> back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the 
> latest offsets stored into the state manager.
> I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to 
> be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always 
> update the changelog offserts before we write the checkpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to