Oliver Hutchison created KAFKA-13249:
----------------------------------------

             Summary: Checkpoints do not contain latest offsets on shutdown 
when using EOS
                 Key: KAFKA-13249
                 URL: https://issues.apache.org/jira/browse/KAFKA-13249
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.8.0
            Reporter: Oliver Hutchison


When using EOS the `.checkpoint` file created when a stateful streams app is 
shutdown does not always contain changelog offsets which represent the latest 
state of the state store. The offsets can often be behind the end of the 
changelog - sometimes quite significantly.

This leads to a state restore being required when the streams app restarts 
after shutting down cleanly as streams thinks (based on the incorrect offsets 
in the checkpoint) that the state store is not up to date with the changelog. 

This is increasing the time we see it takes to do a clean restart of a single 
instance streams app from around 10 second to sometime over 2 minutes in our 
case.

I suspect the bug appears because of race condition in the following method in 
`StreamTask` :

{code:java}
    protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
        // commitNeeded indicates we may have processed some records since last 
commit
        // and hence we need to refresh checkpointable offsets regardless 
whether we should checkpoint or not
        if (commitNeeded) {
            stateMgr.updateChangelogOffsets(checkpointableOffsets());
        }

        super.maybeWriteCheckpoint(enforceCheckpoint);
    }
{code}

In a steady state case for a simple single instance single thread stream app 
where an app simply starts, runs and then shuts down the `if (commitNeeded)` 
test always fails when running with EOS which results in the latest checkpoint 
offsets never getting updated into the `stateMgr`. 

Tracing back to the callers of `maybeWriteCheckpoint` it's easy to see this is 
the case as there's only 1 place in the code which calls `maybeWriteCheckpoint` 
during this steady state. The `postCommit(final boolean enforceCheckpoint)` 
method, specifically the call in the `RUNNING` state.

{code:java}
            case RUNNING:
                if (enforceCheckpoint || !eosEnabled) {
                    maybeWriteCheckpoint(enforceCheckpoint);
                }
                log.debug("Finalized commit for {} task with eos {} enforce 
checkpoint {}", state(), eosEnabled, enforceCheckpoint);
                break;
{code}

We can see from this code that `maybeWriteCheckpoint` will only ever to called 
if `enforceCheckpoint=true` because we know `eosEnabled=true` as we're running 
with EOS. 

So then where does `postCommit` get called with `enforceCheckpoint=true`? Again 
looking only at the steady state case we find that it's only called from 
`TaskManager.tryCloseCleanAllActiveTasks` which is only called from 
`TaskManager.shutdown`. 

The thing about the call in `tryCloseCleanAllActiveTasks` is that it happens 
*after* all active tasks have commited. Which means that 
`StreamTask.commitNeeded=false` fore all tasks which mean that the test back up 
in `maybeWriteCheckpoint` always fails, so we don't end up getting the latest 
offsets stored into the state manager.

I think the fix is to simply change the test in `maybeWriteCheckpoint` to be 
`if (commitNeeded || enforceCheckpoint) { ...` as we know we must always update 
the changelog offserts before we write the checkpoint.







--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to