cadonna commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r491864886



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed 
exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) 
throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long 
cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {

Review comment:
       I saw the behavior @ableegoldman mentions and then I changed the code. 
So it is safe.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed 
exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) 
throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long 
cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - 
lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, 
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {

Review comment:
       I agree with @ableegoldman. Now the code is easier to follow. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed 
exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) 
throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long 
cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - 
lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, 
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task 
{} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, 
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException 
exception) {
+                    log.warn("{} Swallowed the following exception during 
deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);
                 } finally {
                     try {
                         unlock(id);
-
-                        // for manual user call, stream threads are not 
running so it is safe to delete
-                        // the whole directory
-                        if (manualUserCall) {
-                            Utils.delete(taskDir);
-                        }
-                    } catch (final IOException e) {
-                        exception = e;
+                    } catch (final IOException exception) {
+                        log.warn("{} Swallowed the following exception during 
unlocking after " +
+                                "deletion of obsolete state directory for task 
{}: {}",
+                            logPrefix(), dirName, exception);
                     }
                 }
+            }
+        }
+    }
 
-                if (exception != null && manualUserCall) {
-                    log.error("{} Failed to release the state directory 
lock.", logPrefix());
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.getName();
+            final TaskId id = TaskId.parse(dirName);
+            if (!locks.containsKey(id)) {
+                try {
+                    if (lock(id)) {
+                        log.info("{} Deleting state directory {} for task {} 
as user calling cleanup.",
+                            logPrefix(), dirName, id);
+                        Utils.delete(taskDir, Collections.singletonList(new 
File(taskDir, LOCK_FILE_NAME)));
+                    }
+                } catch (final OverlappingFileLockException | IOException 
exception) {
+                    log.error("{} Failed to delete state directory {} for task 
{} with exception: {}",
+                        logPrefix(), dirName, id, exception);
                     throw exception;
+                } finally {
+                    try {
+                        unlock(id);
+                        // for manual user call, stream threads are not 
running so it is safe to delete
+                        // the whole directory
+                        Utils.delete(taskDir);
+                    } catch (final IOException exception) {
+                        log.error("{} Failed to release lock on state 
directory {} for task {} with exception: {}",
+                            logPrefix(), dirName, id, exception);
+                        throw exception;

Review comment:
       According to the Java Language Specification, it is not undefined. See 
https://docs.oracle.com/javase/specs/jls/se8/html/jls-14.html#jls-14.20.2. 
There it says:
   
   > If the catch block completes abruptly for reason R, then the finally block 
is executed. Then there is a choice:
   > 
   >    - If the finally block completes normally, then the try statement 
completes abruptly for reason R.
   > 
   >    - If the finally block completes abruptly for reason S, then the try 
statement completes abruptly for reason S (and reason R is discarded).
   > 
   
   That means, the last thrown exception is propagated. That was also the 
behavior of the refactored code. Since the goal of this PR is actually just to 
fix the log message without changing the behavior, I would leave this change to 
a future PR if we feel that the behavior should change.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed 
exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) 
throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long 
cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - 
lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, 
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task 
{} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, 
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException 
exception) {
+                    log.warn("{} Swallowed the following exception during 
deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);

Review comment:
       Good catch!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to