ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r599076729



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -444,22 +408,30 @@ public synchronized void clean() {
      * Remove the directories for any {@link TaskId}s that are no-longer
      * owned by this {@link StreamThread} and aren't locked by either
      * another process or another {@link StreamThread}
-     * @param cleanupDelayMs only remove directories if they haven't been 
modified for at least
-     *                       this amount of time (milliseconds)
+     * @param cleanupDelayMs        only remove directories if they haven't 
been modified for at least
+     *                              this amount of time (milliseconds)
+     * @param currentThreadNames    the names of all non-DEAD stream threads 
so we can clean up any
+     *                              orphaned task directories
      */
-    public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
+    public synchronized void cleanRemovedTasks(final long cleanupDelayMs, 
final Set<String> currentThreadNames) {
         try {
-            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs, 
currentThreadNames);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed 
exception.", cannotHappen);
         }
     }
 
-    private void cleanRemovedTasksCalledByCleanerThread(final long 
cleanupDelayMs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long 
cleanupDelayMs, final Set<String> currentThreadNames) {
         for (final File taskDir : listNonEmptyTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
-            if (!locks.containsKey(id)) {
+
+            final String owningThread = lockedTasksToStreamThreadOwner.get(id);
+            if (owningThread != null && 
!currentThreadNames.contains(owningThread)) {
+                log.warn("Deleting lock for task directory {} since the thread 
owning the lock is gone: {}", id, owningThread);

Review comment:
       See [KAFKA-10563](https://issues.apache.org/jira/browse/KAFKA-10563) -- 
the short answer is that yes, it should never happen. But the long answer is 
that it may be possible during some exceptional case, or if we introduce a bug 
somewhere, and since users may choose to replace a thread which died and failed 
during the cleanup we must make sure this task is not permanently blocked from 
any other thread ever picking it up.
   There are probably better ways to guard against this possibility, hence I'm 
not considering this PR to fully address 
[KAFKA-10563](https://issues.apache.org/jira/browse/KAFKA-10563), but I did 
want to put in at least some way to escape the situation of an orphaned task 
directory




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to