[
https://issues.apache.org/jira/browse/KAFKA-20663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18086433#comment-18086433
]
Nicholas Telford commented on KAFKA-20663:
------------------------------------------
The root cause is actually a failure to properly unlock the tasks for our
"startup stores". The user reported seeing errors like:
{noformat}
Some task directories still locked while closing state, this indicates unclean
shutdown: {2_3=Thread[#3,main,5,main], 4_4=Thread[#3,main,5,main]} {noformat}
Before restart. The smoking gun here is the {{Thread\[#3,main,5,main],}} which
is the ID of the *main* thread, not an assigned StreamThread.
The root cause is in
{{[StateDirectory#initializeStartupStores|https://github.com/apache/kafka/blob/4.3.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java#L274]}},
where we call {{temporaryStateManager.close()}} {*}directly{*}, instead of via
{{StateDirectoryUtil.closeStateManager}}, causing us to fail to unlock the task!
Since we fail to unlock the task on the main thread, when we later close the
task on its assigned StreamThread, we fail to unlock it (because the lock is
owned by the main thread), so we do not properly close the task; critically, we
do not call {{StateManager#close}}, causing all stores on the Task to not get
closed.
Now that we no longer periodically force a RocksDB flush, stores with a low
throughput of writes will only have their memtables forcibly flushed to disk
during {{RocksDBStore#close}}. And since we're not calling {{close}} due to the
bad task lock, newer data and offsets aren't flushed to disk, causing a stale
offset to be seen on restart.
The fix is simple: switch to {{StateManagerUtil.closeStateManager(..., true,
..., temporaryStateManager)}} in {{StateDirectory#initializeStartupStores}}.
> KIP-1035: stale persisted changelog offset causes
> OffsetOutOfRangeException/TaskCorruptedException on restart
> -------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-20663
> URL: https://issues.apache.org/jira/browse/KAFKA-20663
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 4.3.0
> Reporter: Bill Bejeck
> Assignee: Bill Bejeck
> Priority: Blocker
> Fix For: 4.3.1, 4.4.0
>
>
> In 4.3, KIP-1035 moved the changelog offset into RocksDB and removed the
> forced flush on commit, so the persisted offset is now only made durable by
> an organic memtable flush or a clean close. When that offset goes stale —
> after an unclean exit, or a clean shutdown followed by changelog
> truncation/compaction while the instance is down — and the changelog
> log-start offset has advanced past it, the restore consumer seeks out of
> range and throws OffsetOutOfRangeException, which Streams converts to a
> TaskCorruptedException (full local-state wipe and rebuild). This happens far
> more often than in 4.2 (where the forced flush kept the offset within roughly
> commit.interval.ms), affecting both at-least-once and exactly-once and
> hitting windowed/segmented stores hardest.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)