[ https://issues.apache.org/jira/browse/KAFKA-17162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913543#comment-17913543 ]
Ao Li commented on KAFKA-17162: ------------------------------- Thanks for proposing the fix. I've tried to download a clean clone on my laptop (MacBook pro M3 Max) and my desktop (i9-11900 with ubuntu 24.04) and run command `./gradlew :streams:test --tests DefaultTaskManagerTest.shouldReturnFromAwaitOnInterruption`. I could see failures on both machines. My JDK version is: openjdk 16.0.2 2021-07-20 (ubuntu), and openjdk 21.0.5 2024-10-15 (macos) I've submitted a commit to the branch to adjust sleep durations. https://github.com/aoli-al/kafka/commit/30c485f244c3e0eeafec0a4d5f42ddc4d5019333 (you may pull the changes from the branch https://github.com/aoli-al/kafka/tree/KAFKA-83). Please let me know if this makes the failure more reproducible. > DefaultTaskManagerTest may leak AwaitingRunnable thread > ------------------------------------------------------- > > Key: KAFKA-17162 > URL: https://issues.apache.org/jira/browse/KAFKA-17162 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests > Affects Versions: 3.9.0 > Reporter: Ao Li > Priority: Minor > > The `DefaultTaskManagerTest#shouldReturnFromAwaitOnInterruption` will fail > with the following patch: > {code} > ``` > diff --git > a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java > > b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java > index 5d2db3c279..b87a82b85b 100644 > --- > a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java > +++ > b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java > @@ -348,6 +348,10 @@ public class DefaultTaskManager implements TaskManager { > } > > private <T> T returnWithTasksLocked(final Supplier<T> action) { > + try { > + Thread.sleep(1000); > + } catch (final Exception e) { > + } > tasksLock.lock(); > try { > return action.get(); > diff --git > a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java > > b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java > index 98065eae7d..0d8dde7156 100644 > --- > a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java > +++ > b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java > @@ -114,6 +114,10 @@ public class DefaultTaskManagerTest { > @Override > public void run() { > while (!shutdownRequested.get()) { > + try { > + Thread.sleep(1000); > + } catch (final Exception e) { > + } > try { > taskManager.awaitProcessableTasks(); > } catch (final InterruptedException ignored) { > @@ -151,6 +155,8 @@ public class DefaultTaskManagerTest { > assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, > TimeUnit.MILLISECONDS)); > > awaitingRunnable.shutdown(); > + Thread.sleep(5000); > + assertFalse(awaitingThread.isAlive()); > } > > @Test > ``` > {code} > awatingThread is left unclosed because it was waiting for the signal > {code} > "Thread-3" #25 [26371] prio=5 os_prio=31 cpu=9.68ms elapsed=74.89s > tid=0x00000001250d8600 nid=26371 waiting on condition [0x0000000173d4e000] > java.lang.Thread.State: WAITING (parking) > at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method) > - parking to wait for <0x00000007dcd49b88> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.park(java.base@21.0.2/LockSupport.java:371) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@21.0.2/AbstractQueuedSynchronizer.java:519) > at > java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@21.0.2/ForkJoinPool.java:3780) > at > java.util.concurrent.ForkJoinPool.managedBlock(java.base@21.0.2/ForkJoinPool.java:3725) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@21.0.2/AbstractQueuedSynchronizer.java:1707) > at > org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.lambda$awaitProcessableTasks$1(DefaultTaskManager.java:142) > at > org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager$$Lambda/0x0000007001305428.get(Unknown > Source) > at > org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.returnWithTasksLocked(DefaultTaskManager.java:357) > at > org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.awaitProcessableTasks(DefaultTaskManager.java:129) > at > org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManagerTest$AwaitingRunnable.run(DefaultTaskManagerTest.java:122) > at java.lang.Thread.runWith(java.base@21.0.2/Thread.java:1596) > at java.lang.Thread.run(java.base@21.0.2/Thread.java:1583) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)