cadonna commented on a change in pull request #9262: URL: https://github.com/apache/kafka/pull/9262#discussion_r491864886
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ########## @@ -306,59 +305,69 @@ public synchronized void clean() { */ public synchronized void cleanRemovedTasks(final long cleanupDelayMs) { try { - cleanRemovedTasks(cleanupDelayMs, false); + cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs); } catch (final Exception cannotHappen) { throw new IllegalStateException("Should have swallowed exception.", cannotHappen); } } - private synchronized void cleanRemovedTasks(final long cleanupDelayMs, - final boolean manualUserCall) throws Exception { - final File[] taskDirs = listAllTaskDirectories(); - if (taskDirs == null || taskDirs.length == 0) { - return; // nothing to do - } - - for (final File taskDir : taskDirs) { + private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { + for (final File taskDir : listAllTaskDirectories()) { Review comment: I saw the behavior @ableegoldman mentions and then I changed the code. So it is safe. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ########## @@ -306,59 +305,69 @@ public synchronized void clean() { */ public synchronized void cleanRemovedTasks(final long cleanupDelayMs) { try { - cleanRemovedTasks(cleanupDelayMs, false); + cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs); } catch (final Exception cannotHappen) { throw new IllegalStateException("Should have swallowed exception.", cannotHappen); } } - private synchronized void cleanRemovedTasks(final long cleanupDelayMs, - final boolean manualUserCall) throws Exception { - final File[] taskDirs = listAllTaskDirectories(); - if (taskDirs == null || taskDirs.length == 0) { - return; // nothing to do - } - - for (final File taskDir : taskDirs) { + private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { + for (final File taskDir : listAllTaskDirectories()) { final String dirName = taskDir.getName(); final TaskId id = TaskId.parse(dirName); if (!locks.containsKey(id)) { - Exception exception = null; try { if (lock(id)) { final long now = time.milliseconds(); final long lastModifiedMs = taskDir.lastModified(); if (now > lastModifiedMs + cleanupDelayMs) { log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs); - - Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME))); - } else if (manualUserCall) { Review comment: I agree with @ableegoldman. Now the code is easier to follow. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ########## @@ -306,59 +305,69 @@ public synchronized void clean() { */ public synchronized void cleanRemovedTasks(final long cleanupDelayMs) { try { - cleanRemovedTasks(cleanupDelayMs, false); + cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs); } catch (final Exception cannotHappen) { throw new IllegalStateException("Should have swallowed exception.", cannotHappen); } } - private synchronized void cleanRemovedTasks(final long cleanupDelayMs, - final boolean manualUserCall) throws Exception { - final File[] taskDirs = listAllTaskDirectories(); - if (taskDirs == null || taskDirs.length == 0) { - return; // nothing to do - } - - for (final File taskDir : taskDirs) { + private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { + for (final File taskDir : listAllTaskDirectories()) { final String dirName = taskDir.getName(); final TaskId id = TaskId.parse(dirName); if (!locks.containsKey(id)) { - Exception exception = null; try { if (lock(id)) { final long now = time.milliseconds(); final long lastModifiedMs = taskDir.lastModified(); if (now > lastModifiedMs + cleanupDelayMs) { log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs); - - Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME))); - } else if (manualUserCall) { - log.info("{} Deleting state directory {} for task {} as user calling cleanup.", - logPrefix(), dirName, id); - Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME))); } } - } catch (final OverlappingFileLockException | IOException e) { - exception = e; + } catch (final OverlappingFileLockException | IOException exception) { + log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}", + logPrefix(), dirName, id, exception); } finally { try { unlock(id); - - // for manual user call, stream threads are not running so it is safe to delete - // the whole directory - if (manualUserCall) { - Utils.delete(taskDir); - } - } catch (final IOException e) { - exception = e; + } catch (final IOException exception) { + log.warn("{} Swallowed the following exception during unlocking after " + + "deletion of obsolete state directory for task {}: {}", + logPrefix(), dirName, exception); } } + } + } + } - if (exception != null && manualUserCall) { - log.error("{} Failed to release the state directory lock.", logPrefix()); + private void cleanRemovedTasksCalledByUser() throws Exception { + for (final File taskDir : listAllTaskDirectories()) { + final String dirName = taskDir.getName(); + final TaskId id = TaskId.parse(dirName); + if (!locks.containsKey(id)) { + try { + if (lock(id)) { + log.info("{} Deleting state directory {} for task {} as user calling cleanup.", + logPrefix(), dirName, id); + Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME))); + } + } catch (final OverlappingFileLockException | IOException exception) { + log.error("{} Failed to delete state directory {} for task {} with exception: {}", + logPrefix(), dirName, id, exception); throw exception; + } finally { + try { + unlock(id); + // for manual user call, stream threads are not running so it is safe to delete + // the whole directory + Utils.delete(taskDir); + } catch (final IOException exception) { + log.error("{} Failed to release lock on state directory {} for task {} with exception: {}", + logPrefix(), dirName, id, exception); + throw exception; Review comment: According to the Java Language Specification, it is not undefined. See https://docs.oracle.com/javase/specs/jls/se8/html/jls-14.html#jls-14.20.2. There it says: > If the catch block completes abruptly for reason R, then the finally block is executed. Then there is a choice: > > - If the finally block completes normally, then the try statement completes abruptly for reason R. > > - If the finally block completes abruptly for reason S, then the try statement completes abruptly for reason S (and reason R is discarded). > That means, the last thrown exception is propagated. That was also the behavior of the refactored code. Since the goal of this PR is actually just to fix the log message without changing the behavior, I would leave this change to a future PR if we feel that the behavior should change. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ########## @@ -306,59 +305,69 @@ public synchronized void clean() { */ public synchronized void cleanRemovedTasks(final long cleanupDelayMs) { try { - cleanRemovedTasks(cleanupDelayMs, false); + cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs); } catch (final Exception cannotHappen) { throw new IllegalStateException("Should have swallowed exception.", cannotHappen); } } - private synchronized void cleanRemovedTasks(final long cleanupDelayMs, - final boolean manualUserCall) throws Exception { - final File[] taskDirs = listAllTaskDirectories(); - if (taskDirs == null || taskDirs.length == 0) { - return; // nothing to do - } - - for (final File taskDir : taskDirs) { + private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { + for (final File taskDir : listAllTaskDirectories()) { final String dirName = taskDir.getName(); final TaskId id = TaskId.parse(dirName); if (!locks.containsKey(id)) { - Exception exception = null; try { if (lock(id)) { final long now = time.milliseconds(); final long lastModifiedMs = taskDir.lastModified(); if (now > lastModifiedMs + cleanupDelayMs) { log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs); - - Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME))); - } else if (manualUserCall) { - log.info("{} Deleting state directory {} for task {} as user calling cleanup.", - logPrefix(), dirName, id); - Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME))); } } - } catch (final OverlappingFileLockException | IOException e) { - exception = e; + } catch (final OverlappingFileLockException | IOException exception) { + log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}", + logPrefix(), dirName, id, exception); Review comment: Good catch! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org