[
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax updated KAFKA-16017:
------------------------------------
Fix Version/s: 3.5.3
3.4.2
3.6.2
> Checkpointed offset is incorrect when task is revived and restoring
> --------------------------------------------------------------------
>
> Key: KAFKA-16017
> URL: https://issues.apache.org/jira/browse/KAFKA-16017
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.3.1
> Reporter: Bruno Cadonna
> Assignee: Bruno Cadonna
> Priority: Major
> Fix For: 3.4.2, 3.7.0, 3.6.2, 3.5.3
>
>
> Streams checkpoints the wrong offset when a task is revived after a
> {{TaskCorruptedException}} and the task is then migrated to another stream
> thread during restoration.
> This might happen in a situation like the following if the Streams
> application runs under EOS:
> 1. Streams encounters a Network error which triggers a
> {{TaskCorruptedException}}
> 2. The task that encountered the exception is closed dirty and revived. The
> state store directory is wiped out and a rebalance is triggered.
> 3. Until the sync of the rebalance is received the revived task is restoring.
> 4. When the sync is received the revived task is revoked and a new rebalance
> is triggered. During the revocation the task is closed cleanly and a
> checkpoint file is written.
> 5. With the next rebalance the task moves back to stream thread from which it
> was revoked, read the checkpoint and starts restoring. (I might be enough if
> the task moves to a stream thread on the same Streams client that shares the
> same state directory).
> 6. The state of the task misses some records
> To mitigate the issue one can restart the the stream thread and delete of the
> state on disk. After that the state restores completely from the changelog
> topic and the state does not miss any records anymore.
> The root cause is that the checkpoint that is written in step 4 contains the
> offset that the record collector stored when it sent the records to the
> changelog topic. However, since in step 2 the state directory is wiped out,
> the state does not contain those records anymore. It only contains the
> records that it restored in step 3 which might be less. The root cause of
> this is that the offsets in the record collector are not cleaned up when the
> record collector is closed.
> I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.
> The repro can be started with
> {code}
> ./gradlew streams:test -x checkstyleMain -x checkstyleTest -x spotbugsMain -x
> spotbugsTest --tests RestoreIntegrationTest.test --info > test.log
> {code}
> The repro writes records into a state store and tries to retrieve them again
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582).
> It will throw an {{IllegalStateException}} if it cannot find a record in the
> state
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L594).
> Once the offsets in the record collector are cleared on close
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L332
> and
> https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L349),
> the {{IllegalStateException}} does not occur anymore.
> In the logs you can check for
> - {{Restore batch end offset is}} which are the restored offsets in the state.
> - {{task [0_1] Writing checkpoint:}} which are the written checkpoints.
> - {{task [0_1] Checkpointable offsets}} which show the offsets coming from
> the sending records to the changelog topic
> {{RestoreIntegrationTesttest-stateStore-changelog-1}}
> Always the last instances of these before the {{IllegalStateException}} is
> thrown.
> You will see that the restored offsets are less than the offsets that are
> written to the checkpoint. The offsets written to the checkpoint come from
> the offsets stored when sending the records to the changelog topic.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)