FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1567531204


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction<Boolean, 
IOException> delete, Stri
         try {
             if (delete.execute())
                 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-            else if (logIfMissing)
-                LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+            else {
+                if (logIfMissing) {
+                    LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+                }
+
+                // During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+                // Fallback to delete the file in the new directory to avoid 
orphan file.
+                Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");
+                Matcher dirMatcher = dirPattern.matcher(file.getParent());
+                if (dirMatcher.matches()) {
+                    String topicPartitionAbsolutePath = dirMatcher.group(1) + 
"-" + dirMatcher.group(2);
+                    File fallbackFile = new File(topicPartitionAbsolutePath, 
file.getName());
+                    if (fallbackFile.exists() && fallbackFile.delete()) {

Review Comment:
   Sorry, after reading next comment, I find that I misunderstood this comment. 
One is file name and another is folder name. Yeah, it's better to check file 
name ends with `.deleted` before falling back deletion, so we don't 
accidentally delete other files. 



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction<Boolean, 
IOException> delete, Stri
         try {
             if (delete.execute())
                 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-            else if (logIfMissing)
-                LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+            else {
+                if (logIfMissing) {
+                    LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+                }
+
+                // During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+                // Fallback to delete the file in the new directory to avoid 
orphan file.
+                Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");
+                Matcher dirMatcher = dirPattern.matcher(file.getParent());
+                if (dirMatcher.matches()) {
+                    String topicPartitionAbsolutePath = dirMatcher.group(1) + 
"-" + dirMatcher.group(2);
+                    File fallbackFile = new File(topicPartitionAbsolutePath, 
file.getName());
+                    if (fallbackFile.exists() && fallbackFile.delete()) {
+                        LOGGER.warn("Fallback to delete {} {}.", fileType, 
fallbackFile.getAbsolutePath());

Review Comment:
   Yes, updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to