junrao commented on code in PR #15993:
URL: https://github.com/apache/kafka/pull/15993#discussion_r1625162518


##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -42,10 +45,15 @@
  * <p>
  * Leader Epoch = epoch assigned to each leader by the controller.
  * Offset = offset of the first message in each epoch.
+ * <p>
+ * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flush the 
epoch-entry changes to checkpoint asynchronously.

Review Comment:
   Perhaps name truncateFromStart and truncateFromEnd to sth like 
truncateFromStartAsyncFlush and truncateFromEndAsyncFlush to make it clear?



##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -386,11 +442,39 @@ public OptionalInt epochForOffset(long offset) {
         }
     }
 
-    public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint 
leaderEpochCheckpoint) {
+    /**
+     * Returns a new LeaderEpochFileCache which contains same
+     * epoch entries with replacing backing checkpoint file.
+     * @param leaderEpochCheckpoint the new checkpoint file
+     * @return a new LeaderEpochFileCache instance
+     */
+    public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpointFile 
leaderEpochCheckpoint) {
+        lock.readLock().lock();
+        try {
+            return new LeaderEpochFileCache(epochEntries(),
+                                            topicPartition,
+                                            leaderEpochCheckpoint,
+                                            scheduler);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Returns the leader epoch entries within the range of the given 
start[exclusive] and end[inclusive] offset
+     * @param startOffset The start offset of the epoch entries (exclusive).

Review Comment:
   From the caller's perspective, start offset is inclusive and end offset in 
exclusive.



##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -305,22 +341,23 @@ public Map.Entry<Integer, Long> endOffsetFor(int 
requestedEpoch, long logEndOffs
 
     /**
      * Removes all epoch entries from the store with start offsets greater 
than or equal to the passed offset.
+     * <p>
+     * Checkpoint-flushing is done asynchronously.
      */
     public void truncateFromEnd(long endOffset) {
         lock.writeLock().lock();
         try {
-            Optional<EpochEntry> epochEntry = latestEntry();
-            if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
-                List<EpochEntry> removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
-
-                // We intentionally don't force flushing change to the device 
here because:
+            List<EpochEntry> removedEntries = truncateFromEnd(epochs, 
endOffset);
+            if (!removedEntries.isEmpty()) {
+                // We flush the change to the device in the background because:
                 // - To avoid fsync latency
                 //   * fsync latency could be huge on a disk glitch, which is 
not rare in spinning drives
                 //   * This method is called by ReplicaFetcher threads, which 
could block replica fetching
                 //     then causing ISR shrink or high produce response time 
degradation in remote scope on high fsync latency.
-                // - Even when stale epochs remained in LeaderEpoch file due 
to the unclean shutdown, it will be handled by
-                //   another truncateFromEnd call on log loading procedure so 
it won't be a problem
-                writeToFile(false);
+                // - We still flush the change in #assign synchronously, 
meaning that it's guaranteed that the checkpoint file always has no missing 
entries.
+                //   * Even when stale epochs are restored from the checkpoint 
file after the unclean shutdown, it will be handled by
+                //     another truncateFromEnd call on log loading procedure, 
so it won't be a problem
+                scheduler.scheduleOnce("leader-epoch-cache-flush-" + 
topicPartition, this::writeToFileForTruncation);

Review Comment:
   Hmm, why do we need to add a trailing - in "leader-epoch-cache-flush-"?



##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -128,15 +162,17 @@ private void maybeTruncateNonMonotonicEntries(EpochEntry 
newEntry) {
         }
     }
 
-    private List<EpochEntry> removeFromEnd(Predicate<EpochEntry> predicate) {
+    private static List<EpochEntry> removeFromEnd(

Review Comment:
   Could we fold this method into the caller since it only has 1 line? Ditto 
for `removeFromStart`.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -608,25 +613,23 @@ public boolean isCancelled() {
     }
 
     /**
-     * Returns the leader epoch checkpoint by truncating with the given 
start[exclusive] and end[inclusive] offset
+     * Returns the leader epoch entries within the range of the given 
start[exclusive] and end[inclusive] offset.
+     * <p>
+     * Visible for testing.
      *
      * @param log         The actual log from where to take the leader-epoch 
checkpoint
-     * @param startOffset The start offset of the checkpoint file (exclusive 
in the truncation).
+     * @param startOffset The start offset of the epoch entries (exclusive).

Review Comment:
   From the caller's perspective, the start offset is inclusive and the end 
offset is exclusive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to