jsancio commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r581136883



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, 
OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new 
OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static CompletableFuture<Boolean> deleteSnapshotIfExists(Path 
logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        Utils.atomicMoveWithFallback(path, destination);
+
+        return CompletableFuture.supplyAsync(() -> {

Review comment:
       This looks like the Jira and commit that introduced a delay when 
deleting segments: https://issues.apache.org/jira/browse/KAFKA-636. It looks 
like the delays is there to reduce the probability that the file is deleted 
while Kafka is still reading the file.
   
   Even though `CompletableFuture::supplyAsync` run async it will attempt to 
run immediately.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, 
OffsetAndEpoch snapshotId) throws
 
         return Optional.of(new SnapshotPath(path, new 
OffsetAndEpoch(endOffset, epoch), partial));
     }
+
+    /**
+     * Delete this snapshot from the filesystem.
+     */
+    public static CompletableFuture<Boolean> deleteSnapshotIfExists(Path 
logDir, OffsetAndEpoch snapshotId) throws IOException {
+        Path path = snapshotPath(logDir, snapshotId);
+        Path destination = Snapshots.deleteRename(path, snapshotId);
+        Utils.atomicMoveWithFallback(path, destination);
+
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                return Files.deleteIfExists(destination);
+            } catch (IOException e) {
+                throw new RuntimeException("Error deleting snapshot file " + 
destination + ":" + e.getMessage());
+            }
+        });

Review comment:
       This uses the default Executor in Java. I think we should instead use 
the same async scheduler that is used for the `KafkaMetadataLog`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to