junrao commented on code in PR #15993:
URL: https://github.com/apache/kafka/pull/15993#discussion_r1609025456


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2095,7 +2101,8 @@ object UnifiedLog extends Logging {
   }
 
   /**
-   * If the recordVersion is >= RecordVersion.V2, then create and return a 
LeaderEpochFileCache.
+   * If the recordVersion is >= RecordVersion.V2, then create a new 
LeaderEpochFileCache instance
+   * or update current cache if any with the new checkpoint and return it.

Review Comment:
   How about changing it to the following?
   
   "If the recordVersion is >= RecordVersion.V2, create a new 
LeaderEpochFileCache instance. Loading the epoch entries from the backing 
checkpoint file or the provided currentCache if not empty."



##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -42,10 +43,15 @@
  * <p>
  * Leader Epoch = epoch assigned to each leader by the controller.
  * Offset = offset of the first message in each epoch.
+ * <p>
+ * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flushes the 
epoch-entry changes to checkpoint asynchronously.

Review Comment:
   flushes => flush



##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -390,7 +424,27 @@ public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint 
leaderEpochCheckpoint)
         lock.readLock().lock();
         try {
             leaderEpochCheckpoint.write(epochEntries());
-            return new LeaderEpochFileCache(topicPartition, 
leaderEpochCheckpoint);
+            // We instantiate LeaderEpochFileCache after writing 
leaderEpochCheckpoint,
+            // hence it is guaranteed that the new cache is consistent with 
the latest epoch entries.
+            return new LeaderEpochFileCache(topicPartition, 
leaderEpochCheckpoint, scheduler);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Returns a new LeaderEpochFileCache which contains same
+     * epoch entries with replacing backing checkpoint

Review Comment:
   checkpoint => checkpoint file



##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -313,14 +345,14 @@ public void truncateFromEnd(long endOffset) {
             if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
                 List<EpochEntry> removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
 
-                // We intentionally don't force flushing change to the device 
here because:
+                // We flush the change to the device in the background because:

Review Comment:
   It would be useful to explain in the comment that the reason async flush 
works is because the stale epochs always have more entries and no missing 
entries.



##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -55,16 +61,40 @@ public class LeaderEpochFileCache {
     /**
      * @param topicPartition the associated topic partition
      * @param checkpoint     the checkpoint file
+     * @param scheduler      the scheduler to use for async I/O operations
      */
     @SuppressWarnings("this-escape")
-    public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+    public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint, Scheduler scheduler) {
         this.checkpoint = checkpoint;
         this.topicPartition = topicPartition;
+        this.scheduler = scheduler;
         LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
         log = logContext.logger(LeaderEpochFileCache.class);
         checkpoint.read().forEach(this::assign);
     }
 
+    /**
+     * Instantiates a new LeaderEpochFileCache with replacing checkpoint with 
given one
+     * without restoring the cache from the checkpoint, with retaining the 
current epoch entries.

Review Comment:
   How about following?
   
   "Instantiate a new LeaderEpochFileCache with provided epoch entries instead 
of from the backing checkpoint file. The provided epoch entries are expected to 
no less fresher than the checkpoint file."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to