junrao commented on code in PR #15993:
URL: https://github.com/apache/kafka/pull/15993#discussion_r1612067765


##########
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java:
##########
@@ -41,9 +47,21 @@ public CheckpointFileWithFailureHandler(File file, int 
version, CheckpointFile.E
         checkpointFile = new CheckpointFile<>(file, version, formatter);
     }
 
-    public void write(Collection<T> entries, boolean sync) {
+    public void write(Collection<T> entries) {
+        try {
+            checkpointFile.write(entries);
+        } catch (IOException e) {
+            String msg = "Error while writing to checkpoint file " + 
file.getAbsolutePath();
+            logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e);
+            throw new KafkaStorageException(msg, e);
+        }
+    }
+
+    public void writeIfDirExists(Collection<T> entries) {
         try {
-            checkpointFile.write(entries, sync);
+            checkpointFile.write(entries);
+        } catch (FileNotFoundException | NoSuchFileException e) {
+            log.warn("Failed to write to checkpoint file {}", 
file.getAbsolutePath(), e);

Review Comment:
   Perhaps add in the `warn` that "This is ok if the topic/partition is being 
deleted."



##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -55,16 +61,40 @@ public class LeaderEpochFileCache {
     /**
      * @param topicPartition the associated topic partition
      * @param checkpoint     the checkpoint file
+     * @param scheduler      the scheduler to use for async I/O operations
      */
     @SuppressWarnings("this-escape")
-    public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+    public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint, Scheduler scheduler) {
         this.checkpoint = checkpoint;
         this.topicPartition = topicPartition;
+        this.scheduler = scheduler;
         LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
         log = logContext.logger(LeaderEpochFileCache.class);
         checkpoint.read().forEach(this::assign);
     }
 
+    /**
+     * Instantiate a new LeaderEpochFileCache with provided epoch entries 
instead of from the backing checkpoint file.
+     * The provided epoch entries are expected to no less fresher than the 
checkpoint file.

Review Comment:
   to no less fresher => to be no less fresh



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to