kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1385620217
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -247,12 +247,10 @@ private void closeInternal(final Duration timeout) { closeTimeout = timeout; wakeup(); - if (timeoutMs > 0) { - try { - join(timeoutMs); - } catch (InterruptedException e) { - log.error("Interrupted while waiting for consumer network thread to complete", e); - } + try { Review Comment: Because one of the unit tests fails without it 😄 In the existing `KafkaConsumer.close()`, the coordinator eventually calls this method: ```java private void closeHeartbeatThread() { HeartbeatThread thread; synchronized (this) { if (heartbeatThread == null) return; heartbeatThread.close(); thread = heartbeatThread; heartbeatThread = null; } try { thread.join(); } catch (InterruptedException e) { log.warn("Interrupted while waiting for consumer heartbeat thread to close"); throw new InterruptException(e); } } ``` By blocking via `Thread.join()`, it allows the coordinator to close fully before its underlying resources (client, etc.) are closed. In contrast, our background thread was only waiting up to the timeout, and then proceeded with closing the other resources even though the background thread may not have completed its cleanup. If you'd prefer, I can save this change for a separate PR? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -247,12 +247,10 @@ private void closeInternal(final Duration timeout) { closeTimeout = timeout; wakeup(); - if (timeoutMs > 0) { - try { - join(timeoutMs); - } catch (InterruptedException e) { - log.error("Interrupted while waiting for consumer network thread to complete", e); - } + try { Review Comment: Because one of the unit tests fails without it 😄 In the existing `KafkaConsumer.close()`, the coordinator eventually calls this method: ```java private void closeHeartbeatThread() { HeartbeatThread thread; synchronized (this) { if (heartbeatThread == null) return; heartbeatThread.close(); thread = heartbeatThread; heartbeatThread = null; } try { thread.join(); } catch (InterruptedException e) { log.warn("Interrupted while waiting for consumer heartbeat thread to close"); throw new InterruptException(e); } } ``` By blocking via `Thread.join()`, it allows the coordinator to close fully before its underlying resources (client, etc.) are closed. In contrast, our background thread was only waiting up to the timeout, and then proceeded with closing the other resources even though the background thread may not have completed its cleanup. If you'd prefer, I can save this change for a separate PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org