nicktelford opened a new pull request, #22490: URL: https://github.com/apache/kafka/pull/22490
## Summary `initializeStartupStores()` acquires a `StateDirectory` task lock via `StateManagerUtil.registerStateStores()` but then closes the temporary state manager by calling `temporaryStateManager.close()` directly — which is `ProcessorStateManager.close()`. That method closes the RocksDB stores but has no knowledge of the `StateDirectory` lock and never calls `stateDirectory.unlock()`. Only `StateManagerUtil.closeStateManager()` calls `stateDirectory.unlock()`, and it was never invoked for the startup path. As a result the main thread holds the `StateDirectory` task lock for every startup task permanently. At shutdown, when the stream thread calls `closeStateManager()` for those tasks, `stateDirectory.lock()` returns `false` (main thread owns the lock, stream thread is the caller), so `db.close()` is never called. With WAL disabled the RocksDB memtables — including the 30-second `maybeCheckpoint()` offset writes — are lost when the JVM exits. The SST files retain only the offset from the last data-CF auto-flush, which for an inactive segment can be many hours old, causing `OffsetOutOfRangeException` on the next restart when the stale offset falls outside the changelog retention window. The secondary symptom is the logged error `"Some task directories still locked while closing state, this indicates unclean shutdown"`: `StateDirectory.close()` is called from the shutdown helper thread, not the main thread, so `unlockStartupStores()` cannot release main-thread-owned locks (`unlock()` requires the calling thread to match the lock owner). The fix replaces `temporaryStateManager.close()` with `StateManagerUtil.closeStateManager()`, which is the correct counterpart to `registerStateStores()` — every other call site in the codebase already pairs them. `closeStateManager()` calls `ProcessorStateManager.close()` (preserving existing behaviour including `maybeDowngradeOffsets()`) and then releases the `StateDirectory` lock in a nested `finally` block. ## Test plan - Existing `StateDirectoryTest` suite passes. - Add a test asserting `lockedTasksToOwner` is empty after `initializeStartupStores()` returns. 🤖 Generated with [Claude Code](https://claude.ai/claude-code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
