ableegoldman commented on a change in pull request #8962: URL: https://github.com/apache/kafka/pull/8962#discussion_r448680662
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ########## @@ -136,7 +143,59 @@ private boolean taskDirEmpty(final File taskDir) { !pathname.getName().equals(CHECKPOINT_FILE_NAME)); // if the task is stateless, storeDirs would be null - return storeDirs == null || storeDirs.length == 0; + if (storeDirs == null || storeDirs.length == 0) { + return true; + } + + final List<File> baseSubDirectories = new LinkedList<>(); + for (final File file : storeDirs) { + if (file.isDirectory()) { + baseSubDirectories.add(file); + } else { + return false; + } + } + + for (final File dir : baseSubDirectories) { + final boolean isEmpty; + if (dir.getName().equals(ROCKSDB_DIRECTORY_NAME)) { + isEmpty = taskSubDirectoriesEmpty(dir, true); + } else { + isEmpty = taskSubDirectoriesEmpty(dir, false); + } + if (!isEmpty) { + return false; + } + } + return true; + } + + // BFS through the task directory to look for any files that are not more subdirectories + private boolean taskSubDirectoriesEmpty(final File baseDir, final boolean sstOnly) { + final Queue<File> subDirectories = new LinkedList<>(); + subDirectories.offer(baseDir); + + final Set<File> visited = new HashSet<>(); + while (!subDirectories.isEmpty()) { + final File dir = subDirectories.poll(); + if (!visited.contains(dir)) { + final File[] files = dir.listFiles(); + if (files == null) { + continue; + } + for (final File file : files) { + if (file.isDirectory()) { + subDirectories.offer(file); + } else if (sstOnly && file.getName().endsWith(ROCKSDB_SST_SUFFIX)) { Review comment: But yeah: the extra TaskCorruptedException shouldn't have any correctness implications, and I don't _think_ it will have any bad side effects at all except for confusing ourselves and users. If this seems too risky then we can absolutely take it out. It's just annoying since we pretty much always hit this on startup. For example in the EosBetaUpgradeIntegrationTest (where I noticed this), we start a second client who then gets a standby while the first client revokes the active task. The second client initializes the standby but doesn't get to processing any data before the followup rebalance is triggered, where it receives the active task. So basically every task hits TaskCorruptedException. Of course this is an edge case where we happen to be within the acceptable recovery lag ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org