This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new f2963c2  KAFKA-6647: Do note delete the lock file while holding the 
lock (#8267)
f2963c2 is described below

commit f2963c212eb96d00d2910ae72ac96c0416143922
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Sat Mar 14 13:49:08 2020 -0700

    KAFKA-6647: Do note delete the lock file while holding the lock (#8267)
    
    1. Inside StateDirectory#cleanRemovedTasks, skip deleting the lock file 
(and hence the parent directory) until releasing the lock. And after the lock 
is released only go ahead and delete the parent directory if manualUserCall == 
true. That is, this is triggered from KafkaStreams#cleanUp and users are 
responsible to make sure that Streams instance is not started and hence there 
are no other threads trying to grab that lock.
    
    2. As a result, during scheduled cleanup the corresponding task.dir would 
not be empty but be left with only the lock file, so effectively we still 
achieve the goal of releasing disk spaces. For callers of listTaskDirectories 
like KIP-441 (cc @ableegoldman to take a look) I've introduced a new 
listNonEmptyTaskDirectories which excludes such dummy task.dirs with only the 
lock file left.
    
    3. Also fixed KAFKA-8999 along the way to expose the exception while 
traversing the directory.
    
    Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, John Roesler 
<vvcep...@apache.org>
---
 .../java/org/apache/kafka/common/utils/Utils.java  |  41 ++++--
 .../org/apache/kafka/streams/KafkaStreams.java     |   4 +-
 .../processor/internals/StateDirectory.java        | 137 +++++++++++++--------
 .../streams/processor/internals/TaskManager.java   |   2 +-
 .../processor/internals/StateDirectoryTest.java    |  44 +++++--
 .../processor/internals/TaskManagerTest.java       |   3 +-
 6 files changed, 157 insertions(+), 74 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 0d16c0a..e9d4cc4 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -736,29 +736,56 @@ public final class Utils {
     /**
      * Recursively delete the given file/directory and any subfiles (if any 
exist)
      *
-     * @param file The root file at which to begin deleting
+     * @param rootFile The root file at which to begin deleting
      */
-    public static void delete(final File file) throws IOException {
-        if (file == null)
+    public static void delete(final File rootFile) throws IOException {
+        delete(rootFile, Collections.emptyList());
+    }
+
+    /**
+     * Recursively delete the subfiles (if any exist) of the passed in root 
file that are not included
+     * in the list to keep
+     *
+     * @param rootFile The root file at which to begin deleting
+     * @param filesToKeep The subfiles to keep (note that if a subfile is to 
be kept, so are all its parent
+     *                    files in its pat)h; if empty we would also delete 
the root file
+     */
+    public static void delete(final File rootFile, final List<File> 
filesToKeep) throws IOException {
+        if (rootFile == null)
             return;
-        Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
+        Files.walkFileTree(rootFile.toPath(), new SimpleFileVisitor<Path>() {
             @Override
             public FileVisitResult visitFileFailed(Path path, IOException exc) 
throws IOException {
                 // If the root path did not exist, ignore the error; otherwise 
throw it.
-                if (exc instanceof NoSuchFileException && 
path.toFile().equals(file))
+                if (exc instanceof NoSuchFileException && 
path.toFile().equals(rootFile))
                     return FileVisitResult.TERMINATE;
                 throw exc;
             }
 
             @Override
             public FileVisitResult visitFile(Path path, BasicFileAttributes 
attrs) throws IOException {
-                Files.delete(path);
+                if (!filesToKeep.contains(path.toFile())) {
+                    Files.delete(path);
+                }
                 return FileVisitResult.CONTINUE;
             }
 
             @Override
             public FileVisitResult postVisitDirectory(Path path, IOException 
exc) throws IOException {
-                Files.delete(path);
+                // KAFKA-8999: if there's an exception thrown previously 
already, we should throw it
+                if (exc != null) {
+                    throw exc;
+                }
+
+                if (rootFile.toPath().equals(path)) {
+                    // only delete the parent directory if there's nothing to 
keep
+                    if (filesToKeep.isEmpty()) {
+                        Files.delete(path);
+                    }
+                } else if (!filesToKeep.contains(path.toFile())) {
+                    Files.delete(path);
+                }
+
                 return FileVisitResult.CONTINUE;
             }
         });
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2563718..67d185d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -716,11 +716,11 @@ public class KafkaStreams implements AutoCloseable {
         }
         final ProcessorTopology globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
         final long cacheSizePerThread = totalCacheSize / (threads.length + 
(globalTaskTopology == null ? 0 : 1));
-        final boolean createStateDirectory = 
taskTopology.hasPersistentLocalStore() ||
+        final boolean hasPersistentStores = 
taskTopology.hasPersistentLocalStore() ||
                 (globalTaskTopology != null && 
globalTaskTopology.hasPersistentGlobalStore());
 
         try {
-            stateDirectory = new StateDirectory(config, time, 
createStateDirectory);
+            stateDirectory = new StateDirectory(config, time, 
hasPersistentStores);
         } catch (final ProcessorStateException fatal) {
             throw new StreamsException(fatal);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index f5c4c31..68a9a79 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -33,9 +33,12 @@ import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.regex.Pattern;
 
+import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
+
 /**
  * Manages the directories where the state of Tasks owned by a {@link 
StreamThread} are
  * stored. Handles creation/locking/unlocking/cleaning of the Task 
Directories. This class is not
@@ -48,11 +51,12 @@ public class StateDirectory {
     static final String LOCK_FILE_NAME = ".lock";
     private static final Logger log = 
LoggerFactory.getLogger(StateDirectory.class);
 
+    private final Time time;
+    private final String appId;
     private final File stateDir;
-    private final boolean createStateDirectory;
+    private final boolean hasPersistentStores;
     private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
     private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();
-    private final Time time;
 
     private FileChannel globalStateChannel;
     private FileLock globalStateLock;
@@ -70,22 +74,27 @@ public class StateDirectory {
     /**
      * Ensures that the state base directory as well as the application's 
sub-directory are created.
      *
+     * @param config              streams application configuration to read 
the root state directory path
+     * @param time                system timer used to execute periodic 
cleanup procedure
+     * @param hasPersistentStores only when the application's topology does 
have stores persisted on local file
+     *                            system, we would go ahead and auto-create 
the corresponding application / task / store
+     *                            directories whenever necessary; otherwise no 
directories would be created.
+     *
      * @throws ProcessorStateException if the base state directory or 
application state directory does not exist
-     *                                 and could not be created when 
createStateDirectory is enabled.
+     *                                 and could not be created when 
hasPersistentStores is enabled.
      */
-    public StateDirectory(final StreamsConfig config,
-                          final Time time,
-                          final boolean createStateDirectory) {
+    public StateDirectory(final StreamsConfig config, final Time time, final 
boolean hasPersistentStores) {
         this.time = time;
-        this.createStateDirectory = createStateDirectory;
+        this.hasPersistentStores = hasPersistentStores;
+        this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         final String stateDirName = 
config.getString(StreamsConfig.STATE_DIR_CONFIG);
         final File baseDir = new File(stateDirName);
-        if (this.createStateDirectory && !baseDir.exists() && 
!baseDir.mkdirs()) {
+        if (this.hasPersistentStores && !baseDir.exists() && 
!baseDir.mkdirs()) {
             throw new ProcessorStateException(
                 String.format("base state directory [%s] doesn't exist and 
couldn't be created", stateDirName));
         }
-        stateDir = new File(baseDir, 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
-        if (this.createStateDirectory && !stateDir.exists() && 
!stateDir.mkdir()) {
+        stateDir = new File(baseDir, appId);
+        if (this.hasPersistentStores && !stateDir.exists() && 
!stateDir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("state directory [%s] doesn't exist and couldn't 
be created", stateDir.getPath()));
         }
@@ -98,13 +107,22 @@ public class StateDirectory {
      */
     public File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
-        if (createStateDirectory && !taskDir.exists() && !taskDir.mkdir()) {
+        if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("task directory [%s] doesn't exist and couldn't 
be created", taskDir.getPath()));
         }
         return taskDir;
     }
 
+    private boolean taskDirEmpty(final File taskDir) {
+        final File[] storeDirs = taskDir.listFiles(pathname ->
+            !pathname.getName().equals(LOCK_FILE_NAME) &&
+                !pathname.getName().equals(CHECKPOINT_FILE_NAME));
+
+        // if the task is stateless, storeDirs would be null
+        return storeDirs == null || storeDirs.length == 0;
+    }
+
     /**
      * Get or create the directory for the global stores.
      * @return directory for the global stores
@@ -112,7 +130,7 @@ public class StateDirectory {
      */
     File globalStateDir() {
         final File dir = new File(stateDir, "global");
-        if (createStateDirectory && !dir.exists() && !dir.mkdir()) {
+        if (hasPersistentStores && !dir.exists() && !dir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("global state directory [%s] doesn't exist and 
couldn't be created", dir.getPath()));
         }
@@ -125,12 +143,12 @@ public class StateDirectory {
 
     /**
      * Get the lock for the {@link TaskId}s directory if it is available
-     * @param taskId
+     * @param taskId task id
      * @return true if successful
-     * @throws IOException
+     * @throws IOException if the file cannot be created or file handle cannot 
be grabbed, should be considered as fatal
      */
     synchronized boolean lock(final TaskId taskId) throws IOException {
-        if (!createStateDirectory) {
+        if (!hasPersistentStores) {
             return true;
         }
 
@@ -174,7 +192,7 @@ public class StateDirectory {
     }
 
     synchronized boolean lockGlobalState() throws IOException {
-        if (!createStateDirectory) {
+        if (!hasPersistentStores) {
             return true;
         }
 
@@ -236,18 +254,21 @@ public class StateDirectory {
     }
 
     public synchronized void clean() {
+        // remove task dirs
         try {
             cleanRemovedTasks(0, true);
         } catch (final Exception e) {
             // this is already logged within cleanRemovedTasks
             throw new StreamsException(e);
         }
+        // remove global dir
         try {
             if (stateDir.exists()) {
                 Utils.delete(globalStateDir().getAbsoluteFile());
             }
         } catch (final IOException e) {
-            log.error("{} Failed to delete global state directory due to an 
unexpected exception", logPrefix(), e);
+            log.error("{} Failed to delete global state directory of {} due to 
an unexpected exception",
+                appId, logPrefix(), e);
             throw new StreamsException(e);
         }
     }
@@ -269,7 +290,7 @@ public class StateDirectory {
 
     private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
                                                 final boolean manualUserCall) 
throws Exception {
-        final File[] taskDirs = listTaskDirectories();
+        final File[] taskDirs = listAllTaskDirectories();
         if (taskDirs == null || taskDirs.length == 0) {
             return; // nothing to do
         }
@@ -278,61 +299,73 @@ public class StateDirectory {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
+                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
-                        if (now > lastModifiedMs + cleanupDelayMs || 
manualUserCall) {
-                            if (!manualUserCall) {
-                                log.info(
-                                    "{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
-                                    logPrefix(),
-                                    dirName,
-                                    id,
-                                    now - lastModifiedMs,
-                                    cleanupDelayMs);
-                            } else {
-                                log.info(
-                                        "{} Deleting state directory {} for 
task {} as user calling cleanup.",
-                                        logPrefix(),
-                                        dirName,
-                                        id);
-                            }
-                            Utils.delete(taskDir);
+                        if (now > lastModifiedMs + cleanupDelayMs) {
+                            log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
+                                logPrefix(), dirName, id, now - 
lastModifiedMs, cleanupDelayMs);
+
+                            Utils.delete(taskDir, 
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
+                        } else if (manualUserCall) {
+                            log.info("{} Deleting state directory {} for task 
{} as user calling cleanup.",
+                                logPrefix(), dirName, id);
+
+                            Utils.delete(taskDir, 
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException e) {
-                    // locked by another thread
-                    if (manualUserCall) {
-                        log.error("{} Failed to get the state directory 
lock.", logPrefix(), e);
-                        throw e;
-                    }
-                } catch (final IOException e) {
-                    log.error("{} Failed to delete the state directory.", 
logPrefix(), e);
-                    if (manualUserCall) {
-                        throw e;
-                    }
+                } catch (final OverlappingFileLockException | IOException e) {
+                    exception = e;
                 } finally {
                     try {
                         unlock(id);
-                    } catch (final IOException e) {
-                        log.error("{} Failed to release the state directory 
lock.", logPrefix());
+
+                        // for manual user call, stream threads are not 
running so it is safe to delete
+                        // the whole directory
                         if (manualUserCall) {
-                            throw e;
+                            Utils.delete(taskDir);
                         }
+                    } catch (final IOException e) {
+                        exception = e;
                     }
                 }
+
+                if (exception != null && manualUserCall) {
+                    log.error("{} Failed to release the state directory 
lock.", logPrefix());
+                    throw exception;
+                }
             }
         }
     }
 
     /**
+     * List all of the task directories that are non-empty
+     * @return The list of all the non-empty local directories for stream tasks
+     */
+    File[] listNonEmptyTaskDirectories() {
+        final File[] taskDirectories = !stateDir.exists() ? new File[0] :
+            stateDir.listFiles(pathname -> {
+                if (!pathname.isDirectory() || 
!PATH_NAME.matcher(pathname.getName()).matches()) {
+                    return false;
+                } else {
+                    return !taskDirEmpty(pathname);
+                }
+            });
+
+        return taskDirectories == null ? new File[0] : taskDirectories;
+    }
+
+    /**
      * List all of the task directories
      * @return The list of all the existing local directories for stream tasks
      */
-    File[] listTaskDirectories() {
-        return !stateDir.exists() ? new File[0] :
-                stateDir.listFiles(pathname -> pathname.isDirectory() && 
PATH_NAME.matcher(pathname.getName()).matches());
+    File[] listAllTaskDirectories() {
+        final File[] taskDirectories = !stateDir.exists() ? new File[0] :
+            stateDir.listFiles(pathname -> pathname.isDirectory() && 
PATH_NAME.matcher(pathname.getName()).matches());
+
+        return taskDirectories == null ? new File[0] : taskDirectories;
     }
 
     private FileChannel getOrCreateFileChannel(final TaskId taskId,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9385ca1..21fd96c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -182,7 +182,7 @@ public class TaskManager {
 
         final HashSet<TaskId> tasks = new HashSet<>();
 
-        final File[] stateDirs = 
taskCreator.stateDirectory().listTaskDirectories();
+        final File[] stateDirs = 
taskCreator.stateDirectory().listNonEmptyTaskDirectories();
         if (stateDirs != null) {
             for (final File dir : stateDirs) {
                 try {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 1f7163f..bfcaa44 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -32,6 +32,7 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
@@ -204,18 +205,29 @@ public class StateDirectoryTest {
     public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() 
throws Exception {
         final TaskId task0 = new TaskId(0, 0);
         final TaskId task1 = new TaskId(1, 0);
+        final TaskId task2 = new TaskId(2, 0);
         try {
+            assertTrue(new File(directory.directoryForTask(task0), 
"store").mkdir());
+            assertTrue(new File(directory.directoryForTask(task1), 
"store").mkdir());
+            assertTrue(new File(directory.directoryForTask(task2), 
"store").mkdir());
+
             directory.lock(task0);
             directory.lock(task1);
-            directory.directoryForTask(new TaskId(2, 0));
 
-            List<File> files = 
Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
+            List<File> files = 
Arrays.asList(Objects.requireNonNull(directory.listAllTaskDirectories()));
+            assertEquals(3, files.size());
+
+
+            files = 
Arrays.asList(Objects.requireNonNull(directory.listNonEmptyTaskDirectories()));
             assertEquals(3, files.size());
 
-            time.sleep(1000);
+            time.sleep(5000);
             directory.cleanRemovedTasks(0);
 
-            files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
+            files = 
Arrays.asList(Objects.requireNonNull(directory.listAllTaskDirectories()));
+            assertEquals(3, files.size());
+
+            files = 
Arrays.asList(Objects.requireNonNull(directory.listNonEmptyTaskDirectories()));
             assertEquals(2, files.size());
             assertTrue(files.contains(new File(appDir, task0.toString())));
             assertTrue(files.contains(new File(appDir, task1.toString())));
@@ -228,13 +240,19 @@ public class StateDirectoryTest {
     @Test
     public void 
shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() {
         final File dir = directory.directoryForTask(new TaskId(2, 0));
+        assertTrue(new File(dir, "store").mkdir());
+
         final int cleanupDelayMs = 60000;
         directory.cleanRemovedTasks(cleanupDelayMs);
         assertTrue(dir.exists());
+        assertEquals(1, directory.listAllTaskDirectories().length);
+        assertEquals(1, directory.listNonEmptyTaskDirectories().length);
 
         time.sleep(cleanupDelayMs + 1000);
         directory.cleanRemovedTasks(cleanupDelayMs);
-        assertFalse(dir.exists());
+        assertTrue(dir.exists());
+        assertEquals(1, directory.listAllTaskDirectories().length);
+        assertEquals(0, directory.listNonEmptyTaskDirectories().length);
     }
 
     @Test
@@ -245,15 +263,21 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldListAllTaskDirectories() {
+    public void shouldOnlyListNonEmptyTaskDirectories() {
         TestUtils.tempDirectory(stateDir.toPath(), "foo");
         final File taskDir1 = directory.directoryForTask(new TaskId(0, 0));
         final File taskDir2 = directory.directoryForTask(new TaskId(0, 1));
 
-        final List<File> dirs = Arrays.asList(directory.listTaskDirectories());
-        assertEquals(2, dirs.size());
-        assertTrue(dirs.contains(taskDir1));
-        assertTrue(dirs.contains(taskDir2));
+        final File storeDir = new File(taskDir1, "store");
+        assertTrue(storeDir.mkdir());
+
+        assertEquals(Arrays.asList(taskDir1, taskDir2), 
Arrays.asList(directory.listAllTaskDirectories()));
+        assertEquals(Collections.singletonList(taskDir1), 
Arrays.asList(directory.listNonEmptyTaskDirectories()));
+
+        directory.cleanRemovedTasks(0L);
+
+        assertEquals(Arrays.asList(taskDir1, taskDir2), 
Arrays.asList(directory.listAllTaskDirectories()));
+        assertEquals(Collections.emptyList(), 
Arrays.asList(directory.listNonEmptyTaskDirectories()));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index b95cec3..bc2f319 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -222,7 +222,7 @@ public class TaskManagerTest {
         assertTrue((new File(taskFolders[3], 
StateManagerUtil.CHECKPOINT_FILE_NAME)).createNewFile());
 
         
expect(activeTaskCreator.stateDirectory()).andReturn(stateDirectory).once();
-        
expect(stateDirectory.listTaskDirectories()).andReturn(taskFolders).once();
+        
expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(taskFolders).once();
 
         EasyMock.replay(activeTaskCreator, stateDirectory);
 
@@ -672,7 +672,6 @@ public class TaskManagerTest {
         expect(activeTaskCreator.createTasks(EasyMock.anyObject(),
                                                   
EasyMock.eq(taskId0Assignment)))
                 .andReturn(Collections.singletonList(streamTask));
-
     }
 
     private void mockTopologyBuilder() {

Reply via email to