showuon commented on a change in pull request #9904:
URL: https://github.com/apache/kafka/pull/9904#discussion_r560617655



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -94,27 +94,32 @@ public StateDirectory(final StreamsConfig config, final 
Time time, final boolean
         this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         final String stateDirName = 
config.getString(StreamsConfig.STATE_DIR_CONFIG);
         final File baseDir = new File(stateDirName);
-        if (this.hasPersistentStores && !baseDir.exists() && 
!baseDir.mkdirs()) {
-            throw new ProcessorStateException(
-                String.format("base state directory [%s] doesn't exist and 
couldn't be created", stateDirName));
-        }
         stateDir = new File(baseDir, appId);
-        if (this.hasPersistentStores && !stateDir.exists() && 
!stateDir.mkdir()) {
-            throw new ProcessorStateException(
-                String.format("state directory [%s] doesn't exist and couldn't 
be created", stateDir.getPath()));
-        }
-        if (hasPersistentStores && stateDirName.startsWith("/tmp")) {
-            log.warn("Using /tmp directory in the state.dir property can cause 
failures with writing the checkpoint file" +
-                " due to the fact that this directory can be cleared by the 
OS");
-        }
-        final Path basePath = Paths.get(baseDir.getPath());
-        final Path statePath = Paths.get(stateDir.getPath());
-        final Set<PosixFilePermission> perms = 
PosixFilePermissions.fromString("rwxr-x---");
-        try {
-            Files.setPosixFilePermissions(basePath, perms);
-            Files.setPosixFilePermissions(statePath, perms);
-        } catch (final IOException e) {
-            log.error("Error changing permissions for the state or base 
directory {} ", stateDir.getPath(), e);
+
+        if (this.hasPersistentStores) {
+            if (!baseDir.exists() && !baseDir.mkdirs()) {
+                throw new ProcessorStateException(
+                    String.format("base state directory [%s] doesn't exist and 
couldn't be created", stateDirName));
+            }
+            if (!stateDir.exists() && !stateDir.mkdir()) {
+                throw new ProcessorStateException(
+                    String.format("state directory [%s] doesn't exist and 
couldn't be created", stateDir.getPath()));
+            }
+            if (stateDirName.startsWith("/tmp")) {
+                log.warn("Using /tmp directory in the state.dir property can 
cause failures with writing the checkpoint file" +
+                    " due to the fact that this directory can be cleared by 
the OS");
+            }
+
+            // change the dir permission to "rwxr-x---" to avoid world readable
+            final Path basePath = Paths.get(baseDir.getPath());
+            final Path statePath = Paths.get(stateDir.getPath());
+            final Set<PosixFilePermission> perms = 
PosixFilePermissions.fromString("rwxr-x---");
+            try {
+                Files.setPosixFilePermissions(basePath, perms);
+                Files.setPosixFilePermissions(statePath, perms);
+            } catch (final IOException e) {
+                log.warn("Error changing permissions for the state or base 
directory {} ", stateDir.getPath(), e);

Review comment:
       Agree! revert back to `log.error`. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to