ableegoldman commented on a change in pull request #10862: URL: https://github.com/apache/kafka/pull/10862#discussion_r649497973
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ########## @@ -312,7 +319,7 @@ private boolean taskDirIsEmpty(final File taskDir) { */ File globalStateDir() { final File dir = new File(stateDir, "global"); - if (hasPersistentStores && !dir.exists() && !dir.mkdir()) { + if (hasPersistentStores && ((dir.exists() && !dir.isDirectory()) || (!dir.exists() && !dir.mkdir()))) { throw new ProcessorStateException( String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath())); Review comment: ditto here, please add a separate check and exception ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ########## @@ -126,7 +126,7 @@ public StateDirectory(final StreamsConfig config, final Time time, final boolean throw new ProcessorStateException( String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName)); } - if (!stateDir.exists() && !stateDir.mkdir()) { + if ((stateDir.exists() && !stateDir.isDirectory()) || (!stateDir.exists() && !stateDir.mkdir())) { Review comment: Please split this up into a separate check for `if ((stateDir.exists() && !stateDir.isDirectory())` and then throw an accurate exception, eg `state directory could not be created as there is an existing file with the same name` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ########## @@ -230,18 +230,25 @@ public UUID initializeProcessId() { public File getOrCreateDirectoryForTask(final TaskId taskId) { final File taskParentDir = getTaskDirectoryParentName(taskId); final File taskDir = new File(taskParentDir, StateManagerUtil.toTaskDirString(taskId)); - if (hasPersistentStores && !taskDir.exists()) { - synchronized (taskDirCreationLock) { - // to avoid a race condition, we need to check again if the directory does not exist: - // otherwise, two threads might pass the outer `if` (and enter the `then` block), - // one blocks on `synchronized` while the other creates the directory, - // and the blocking one fails when trying to create it after it's unblocked - if (!taskParentDir.exists() && !taskParentDir.mkdir()) { - throw new ProcessorStateException( + if (hasPersistentStores) { + if (!taskDir.exists()) { + synchronized (taskDirCreationLock) { + // to avoid a race condition, we need to check again if the directory does not exist: + // otherwise, two threads might pass the outer `if` (and enter the `then` block), + // one blocks on `synchronized` while the other creates the directory, + // and the blocking one fails when trying to create it after it's unblocked + if (!taskParentDir.exists() && !taskParentDir.mkdir()) { + throw new ProcessorStateException( String.format("Parent [%s] of task directory [%s] doesn't exist and couldn't be created", - taskParentDir.getPath(), taskDir.getPath())); + taskParentDir.getPath(), taskDir.getPath())); + } + if (!taskDir.exists() && !taskDir.mkdir()) { + throw new ProcessorStateException( + String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath())); + } } - if (!taskDir.exists() && !taskDir.mkdir()) { + } else { + if (!taskDir.isDirectory()) { Review comment: Same here, this exception message does not apply to the case this is trying to catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org