hachikuji commented on a change in pull request #10021: URL: https://github.com/apache/kafka/pull/10021#discussion_r582471506
########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -299,7 +302,16 @@ final class KafkaMetadataLog private ( // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists // on the file system, so we should first remove snapshotId and then delete snapshot file. expiredSnapshotIdsIter.remove() - Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId) + + val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) + val destination = Snapshots.deleteRename(path, snapshotId) + try { + Utils.atomicMoveWithFallback(path, destination) + } catch { + case e: IOException => + warn("Error renaming snapshot file: " + path + " to :" + destination + ", " + e.getMessage) Review comment: nit: Could this be error level? Can we use `$` substitutions? Also, let's include the full exception instead of just the message. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -341,7 +342,7 @@ object KafkaMetadataLog { } } - val replicatedLog = new KafkaMetadataLog(log, snapshotIds, topicPartition, maxFetchSizeInBytes) + val replicatedLog = new KafkaMetadataLog(log, scheduler, snapshotIds, topicPartition, maxFetchSizeInBytes) Review comment: Not something we need to solve here, but I think we should put some thought into consolidating file management. Right now it's a little awkward to divide responsibility between `KafkaMetadataLog` and `Log`. For example, I think we are trying to say that `KafkaMetadataLog` is responsible for snapshots. That is mostly true, but we are relying on `Log.loadSegmentFiles` for the deletion of orphaned `.delete` snapshots. ########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -299,7 +290,16 @@ final class KafkaMetadataLog private ( // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists // on the file system, so we should first remove snapshotId and then delete snapshot file. expiredSnapshotIdsIter.remove() - Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId) + + val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) + val destination = Snapshots.deleteRename(path, snapshotId) + try { + Utils.atomicMoveWithFallback(path, destination) + } catch { + case e: IOException => + warn("Error renaming snapshot file: " + path + " to :" + destination + ", " + e.getMessage) + } + scheduler.schedule("delete-snapshot-file", () => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId)) Review comment: Similar to `Log.deleteSegmentFiles`, we should probably use a delay here. I think it would be ok to either hard-code this to the default of 60s, or use `file.delete.delay.ms` from the default configuration. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org