ableegoldman commented on a change in pull request #10862:
URL: https://github.com/apache/kafka/pull/10862#discussion_r649497973



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -312,7 +319,7 @@ private boolean taskDirIsEmpty(final File taskDir) {
      */
     File globalStateDir() {
         final File dir = new File(stateDir, "global");
-        if (hasPersistentStores && !dir.exists() && !dir.mkdir()) {
+        if (hasPersistentStores && ((dir.exists() && !dir.isDirectory()) || 
(!dir.exists() && !dir.mkdir()))) {
             throw new ProcessorStateException(
                 String.format("global state directory [%s] doesn't exist and 
couldn't be created", dir.getPath()));

Review comment:
       ditto here, please add a separate check and exception

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -126,7 +126,7 @@ public StateDirectory(final StreamsConfig config, final 
Time time, final boolean
                 throw new ProcessorStateException(
                     String.format("base state directory [%s] doesn't exist and 
couldn't be created", stateDirName));
             }
-            if (!stateDir.exists() && !stateDir.mkdir()) {
+            if ((stateDir.exists() && !stateDir.isDirectory()) || 
(!stateDir.exists() && !stateDir.mkdir())) {

Review comment:
       Please split this up into a separate check for `if ((stateDir.exists() 
&& !stateDir.isDirectory())` and then throw an accurate exception, eg `state 
directory could not be created as there is an existing file with the same name`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -230,18 +230,25 @@ public UUID initializeProcessId() {
     public File getOrCreateDirectoryForTask(final TaskId taskId) {
         final File taskParentDir = getTaskDirectoryParentName(taskId);
         final File taskDir = new File(taskParentDir, 
StateManagerUtil.toTaskDirString(taskId));
-        if (hasPersistentStores && !taskDir.exists()) {
-            synchronized (taskDirCreationLock) {
-                // to avoid a race condition, we need to check again if the 
directory does not exist:
-                // otherwise, two threads might pass the outer `if` (and enter 
the `then` block),
-                // one blocks on `synchronized` while the other creates the 
directory,
-                // and the blocking one fails when trying to create it after 
it's unblocked
-                if (!taskParentDir.exists() && !taskParentDir.mkdir()) {
-                    throw new ProcessorStateException(
+        if (hasPersistentStores) {
+            if (!taskDir.exists()) {
+                synchronized (taskDirCreationLock) {
+                    // to avoid a race condition, we need to check again if 
the directory does not exist:
+                    // otherwise, two threads might pass the outer `if` (and 
enter the `then` block),
+                    // one blocks on `synchronized` while the other creates 
the directory,
+                    // and the blocking one fails when trying to create it 
after it's unblocked
+                    if (!taskParentDir.exists() && !taskParentDir.mkdir()) {
+                        throw new ProcessorStateException(
                             String.format("Parent [%s] of task directory [%s] 
doesn't exist and couldn't be created",
-                                    taskParentDir.getPath(), 
taskDir.getPath()));
+                                taskParentDir.getPath(), taskDir.getPath()));
+                    }
+                    if (!taskDir.exists() && !taskDir.mkdir()) {
+                        throw new ProcessorStateException(
+                            String.format("task directory [%s] doesn't exist 
and couldn't be created", taskDir.getPath()));
+                    }
                 }
-                if (!taskDir.exists() && !taskDir.mkdir()) {
+            } else {
+                if (!taskDir.isDirectory()) {

Review comment:
       Same here, this exception message does not apply to the case this is 
trying to catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to