[ 
https://issues.apache.org/jira/browse/KAFKA-17162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913537#comment-17913537
 ] 

Matthias J. Sax commented on KAFKA-17162:
-----------------------------------------

I could not reproduce this... applied your patch to `trunk` but the test passed.

I agree that we should `join()` the threads that we `start()` though – if there 
is any issue that a thread stays in `WAITING` state, `join()` should surface 
the issue. Opened a PR for it, but test did pass locally with `join()` and also 
with `join()` + your patch.

If we cannot reproduce this, I would propose to just close the ticket.

> DefaultTaskManagerTest may leak AwaitingRunnable thread
> -------------------------------------------------------
>
>                 Key: KAFKA-17162
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17162
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, unit tests
>    Affects Versions: 3.9.0
>            Reporter: Ao Li
>            Priority: Minor
>
> The `DefaultTaskManagerTest#shouldReturnFromAwaitOnInterruption` will fail 
> with the following patch:
> {code}
> ```
> diff --git 
> a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
>  
> b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
> index 5d2db3c279..b87a82b85b 100644
> --- 
> a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
> +++ 
> b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
> @@ -348,6 +348,10 @@ public class DefaultTaskManager implements TaskManager {
>      }
>  
>      private <T> T returnWithTasksLocked(final Supplier<T> action) {
> +        try {
> +            Thread.sleep(1000);
> +        } catch (final Exception e) {
> +        }
>          tasksLock.lock();
>          try {
>              return action.get();
> diff --git 
> a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
>  
> b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
> index 98065eae7d..0d8dde7156 100644
> --- 
> a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
> +++ 
> b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
> @@ -114,6 +114,10 @@ public class DefaultTaskManagerTest {
>          @Override
>          public void run() {
>              while (!shutdownRequested.get()) {
> +                try {
> +                    Thread.sleep(1000);
> +                } catch (final Exception e) {
> +                }
>                  try {
>                      taskManager.awaitProcessableTasks();
>                  } catch (final InterruptedException ignored) {
> @@ -151,6 +155,8 @@ public class DefaultTaskManagerTest {
>          assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, 
> TimeUnit.MILLISECONDS));
>  
>          awaitingRunnable.shutdown();
> +        Thread.sleep(5000);
> +        assertFalse(awaitingThread.isAlive());
>      }
>  
>      @Test
> ```
> {code}
>  awatingThread is left unclosed because it was waiting for the signal
> {code}
> "Thread-3" #25 [26371] prio=5 os_prio=31 cpu=9.68ms elapsed=74.89s 
> tid=0x00000001250d8600 nid=26371 waiting on condition  [0x0000000173d4e000]
>    java.lang.Thread.State: WAITING (parking)
>         at jdk.internal.misc.Unsafe.park(java.base@21.0.2/Native Method)
>         - parking to wait for  <0x00000007dcd49b88> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at 
> java.util.concurrent.locks.LockSupport.park(java.base@21.0.2/LockSupport.java:371)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@21.0.2/AbstractQueuedSynchronizer.java:519)
>         at 
> java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@21.0.2/ForkJoinPool.java:3780)
>         at 
> java.util.concurrent.ForkJoinPool.managedBlock(java.base@21.0.2/ForkJoinPool.java:3725)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@21.0.2/AbstractQueuedSynchronizer.java:1707)
>         at 
> org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.lambda$awaitProcessableTasks$1(DefaultTaskManager.java:142)
>         at 
> org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager$$Lambda/0x0000007001305428.get(Unknown
>  Source)
>         at 
> org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.returnWithTasksLocked(DefaultTaskManager.java:357)
>         at 
> org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.awaitProcessableTasks(DefaultTaskManager.java:129)
>         at 
> org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManagerTest$AwaitingRunnable.run(DefaultTaskManagerTest.java:122)
>         at java.lang.Thread.runWith(java.base@21.0.2/Thread.java:1596)
>         at java.lang.Thread.run(java.base@21.0.2/Thread.java:1583)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to