chia7712 commented on code in PR #16614:
URL: https://github.com/apache/kafka/pull/16614#discussion_r1683591221


##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -348,7 +348,8 @@ public void truncateFromEndAsyncFlush(long endOffset) {
                 // - We still flush the change in #assign synchronously, 
meaning that it's guaranteed that the checkpoint file always has no missing 
entries.
                 //   * Even when stale epochs are restored from the checkpoint 
file after the unclean shutdown, it will be handled by
                 //     another truncateFromEnd call on log loading procedure, 
so it won't be a problem
-                scheduler.scheduleOnce("leader-epoch-cache-flush-" + 
topicPartition, this::writeToFileForTruncation);
+                List<EpochEntry> entries = new ArrayList<>(epochs.values());
+                scheduler.scheduleOnce("leader-epoch-cache-flush-" + 
topicPartition, () -> checkpoint.writeForTruncation(entries));

Review Comment:
   > Since the issue is only in the test, I am wondering if we could fix the 
test directly. For example, perhaps we could introduce a NoOpScheduler and use 
it in the test, since the test doesn't depend on the leader epoch entries to be 
actually flushed to disk.
   
   this is another good approach.
   
   > This approach introduces a new correctness issue. With this change, it's 
possible for older epoch entries to overwrite the newer epoch entries in the 
leader epoch file. Consider the following sequence: we take a snapshot of the 
epoch entries here; a new epoch entry is added and is flushed to disk; the 
scheduler then writes the snapshot to disk. This can lead to the case where the 
leader epoch file doesn't contain all entries up to the recovery point.
   
   Sorry to cause possible correctness issue. @FrankYang0529 and I had 
discussed the approach offline when I noticed that deadlock, and I suggest to 
change the production code directly. It seems to me this PR does NOT change the 
execution order, because the "writeToFileForTruncation" does not hold the 
single lock to complete the "snapshot" and "flush".
   ```java
       private void writeToFileForTruncation() {
           // phase 1: create snapshot by holding read lock
           List<EpochEntry> entries;
           lock.readLock().lock();
           try {
               entries = new ArrayList<>(epochs.values());
           } finally {
               lock.readLock().unlock();
           }
           // phase 2: flush by holding write lock
           checkpoint.writeForTruncation(entries);
       }
   ```
   Hence, the issue you mentioned can happen even though we revert this PR. for 
example:
   1. `writeToFileForTruncation` (run by scheduler) take a snapshot of the 
epoch entries in phase 1 (see comment in above code)
   2. a new epoch entry is added and is flushed to disk
   3. `writeToFileForTruncation` (run by scheduler) then writes the snapshot to 
disk in phase 2 (see comment in above code)
   
   In summary: there are two follow-up:
   
   1. rewrite `testLogRecoveryMetrics` by `NoOpScheduler`
   2. add `writeToFileForTruncation` back except for "snapshot". for example:
   ```java
       private void writeToFileForTruncation() {
           lock.readLock().lock();
           try {
               checkpoint.writeForTruncation(epochs.values());
           } finally {
               lock.readLock().unlock();
           }
       }
   ```
   
   @junrao WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to