chia7712 commented on code in PR #16614: URL: https://github.com/apache/kafka/pull/16614#discussion_r1683591221
########## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ########## @@ -348,7 +348,8 @@ public void truncateFromEndAsyncFlush(long endOffset) { // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by // another truncateFromEnd call on log loading procedure, so it won't be a problem - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); + List<EpochEntry> entries = new ArrayList<>(epochs.values()); + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, () -> checkpoint.writeForTruncation(entries)); Review Comment: > Since the issue is only in the test, I am wondering if we could fix the test directly. For example, perhaps we could introduce a NoOpScheduler and use it in the test, since the test doesn't depend on the leader epoch entries to be actually flushed to disk. this is another good approach. > This approach introduces a new correctness issue. With this change, it's possible for older epoch entries to overwrite the newer epoch entries in the leader epoch file. Consider the following sequence: we take a snapshot of the epoch entries here; a new epoch entry is added and is flushed to disk; the scheduler then writes the snapshot to disk. This can lead to the case where the leader epoch file doesn't contain all entries up to the recovery point. Sorry to cause possible correctness issue. @FrankYang0529 and I had discussed the approach offline when I noticed that deadlock, and I suggest to change the production code directly. It seems to me this PR does NOT change the execution order, because the "writeToFileForTruncation" does not hold the single lock to complete the "snapshot" and "flush". ```java private void writeToFileForTruncation() { // phase 1: create snapshot by holding read lock List<EpochEntry> entries; lock.readLock().lock(); try { entries = new ArrayList<>(epochs.values()); } finally { lock.readLock().unlock(); } // phase 2: flush by holding write lock checkpoint.writeForTruncation(entries); } ``` Hence, the issue you mentioned can happen even though we revert this PR. for example: 1. `writeToFileForTruncation` (run by scheduler) take a snapshot of the epoch entries in phase 1 (see comment in above code) 2. a new epoch entry is added and is flushed to disk 3. `writeToFileForTruncation` (run by scheduler) then writes the snapshot to disk in phase 2 (see comment in above code) In summary: there are two follow-up: 1. rewrite `testLogRecoveryMetrics` by `NoOpScheduler` 2. add `writeToFileForTruncation` back except for "snapshot". for example: ```java private void writeToFileForTruncation() { lock.readLock().lock(); try { checkpoint.writeForTruncation(epochs.values()); } finally { lock.readLock().unlock(); } } ``` @junrao WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org