This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new e4540fe KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores (#9373) e4540fe is described below commit e4540fec7cf310b47ddcb81ec27d2e5c2046c7ff Author: Michael Bingham <michael.b.bing...@gmail.com> AuthorDate: Wed Oct 7 16:48:35 2020 -0600 KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores (#9373) Avoid continuous repeated logging by not trying to clean empty task directories, which are longer fully deleted during internal cleanup as of https://issues.apache.org/jira/browse/KAFKA-6647. Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../processor/internals/StateDirectory.java | 2 +- .../processor/internals/StateDirectoryTest.java | 33 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index de9fbea..35f937a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -315,7 +315,7 @@ public class StateDirectory { } private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { - for (final File taskDir : listAllTaskDirectories()) { + for (final File taskDir : listNonEmptyTaskDirectories()) { final String dirName = taskDir.getName(); final TaskId id = TaskId.parse(dirName); if (!locks.containsKey(id)) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index cb98789..24c9ab0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -50,9 +50,11 @@ import java.util.stream.Collectors; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_FILE_NAME; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -286,6 +288,7 @@ public class StateDirectoryTest { } } + @Test public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() { final File dir = directory.directoryForTask(new TaskId(2, 0)); @@ -305,6 +308,36 @@ public class StateDirectoryTest { } @Test + public void shouldCleanupObsoleteStateDirectoriesOnlyOnce() { + final File dir = directory.directoryForTask(new TaskId(2, 0)); + assertTrue(new File(dir, "store").mkdir()); + assertEquals(1, directory.listAllTaskDirectories().length); + assertEquals(1, directory.listNonEmptyTaskDirectories().length); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) { + directory.cleanRemovedTasks(0); + assertTrue(dir.exists()); + assertEquals(1, directory.listAllTaskDirectories().length); + assertEquals(0, directory.listNonEmptyTaskDirectories().length); + assertThat( + appender.getMessages(), + hasItem(containsString("Deleting obsolete state directory")) + ); + } + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) { + directory.cleanRemovedTasks(0); + assertTrue(dir.exists()); + assertEquals(1, directory.listAllTaskDirectories().length); + assertEquals(0, directory.listNonEmptyTaskDirectories().length); + assertThat( + appender.getMessages(), + not(hasItem(containsString("Deleting obsolete state directory"))) + ); + } + } + + @Test public void shouldNotRemoveNonTaskDirectoriesAndFiles() { final File otherDir = TestUtils.tempDirectory(stateDir.toPath(), "foo"); directory.cleanRemovedTasks(0);