hachikuji commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582471506



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -299,7 +302,16 @@ final class KafkaMetadataLog private (
       // If snapshotIds contains a snapshot id, the KafkaRaftClient and 
Listener can expect that the snapshot exists
       // on the file system, so we should first remove snapshotId and then 
delete snapshot file.
       expiredSnapshotIdsIter.remove()
-      Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId)
+
+      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+      val destination = Snapshots.deleteRename(path, snapshotId)
+      try {
+        Utils.atomicMoveWithFallback(path, destination)
+      } catch {
+        case e: IOException =>
+          warn("Error renaming snapshot file: " + path + " to :" + destination 
+ ", " + e.getMessage)

Review comment:
       nit: Could this be error level? Can we use `$` substitutions? Also, 
let's include the full exception instead of just the message.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -341,7 +342,7 @@ object KafkaMetadataLog {
         }
       }
 
-    val replicatedLog = new KafkaMetadataLog(log, snapshotIds, topicPartition, 
maxFetchSizeInBytes)
+    val replicatedLog = new KafkaMetadataLog(log, scheduler, snapshotIds, 
topicPartition, maxFetchSizeInBytes)

Review comment:
       Not something we need to solve here, but I think we should put some 
thought into consolidating file management. Right now it's a little awkward to 
divide responsibility between `KafkaMetadataLog` and `Log`. For example, I 
think we are trying to say that `KafkaMetadataLog` is responsible for 
snapshots. That is mostly true, but we are relying on `Log.loadSegmentFiles` 
for the deletion of orphaned `.delete` snapshots.

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -299,7 +290,16 @@ final class KafkaMetadataLog private (
       // If snapshotIds contains a snapshot id, the KafkaRaftClient and 
Listener can expect that the snapshot exists
       // on the file system, so we should first remove snapshotId and then 
delete snapshot file.
       expiredSnapshotIdsIter.remove()
-      Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId)
+
+      val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+      val destination = Snapshots.deleteRename(path, snapshotId)
+      try {
+        Utils.atomicMoveWithFallback(path, destination)
+      } catch {
+        case e: IOException =>
+          warn("Error renaming snapshot file: " + path + " to :" + destination 
+ ", " + e.getMessage)
+      }
+      scheduler.schedule("delete-snapshot-file", () => 
Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId))

Review comment:
       Similar to `Log.deleteSegmentFiles`, we should probably use a delay 
here. I think it would be ok to either hard-code this to the default of 60s, or 
use `file.delete.delay.ms` from the default configuration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to