This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9078451 MINOR: Add num threads logging upon shutdown (#11652) 9078451 is described below commit 9078451e37a5e8bc8cd0c9865837fe5379e8f2d0 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Thu Jan 6 11:28:27 2022 -0800 MINOR: Add num threads logging upon shutdown (#11652) 1. Add num of threads logging upon shutdown. 2. Prefix the shutdown thread with client id. Reviewers: John Roesler <vvcep...@apache.org> --- .../org/apache/kafka/streams/KafkaStreams.java | 29 ++++++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index a8f58d8..6b7e214 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1294,7 +1294,9 @@ public class KafkaStreams implements AutoCloseable { globalStreamThread.start(); } - processStreamThread(StreamThread::start); + final int numThreads = processStreamThread(StreamThread::start); + + log.info("Started {} stream threads", numThreads); final Long cleanupDelay = applicationConfigs.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); stateDirCleaner.scheduleAtFixedRate(() -> { @@ -1339,20 +1341,31 @@ public class KafkaStreams implements AutoCloseable { return new Thread(() -> { // notify all the threads to stop; avoid deadlocks by stopping any // further state reports from the thread since we're shutting down - processStreamThread(StreamThread::shutdown); + int numStreamThreads = processStreamThread(StreamThread::shutdown); + + log.info("Shutting down {} stream threads", numStreamThreads); + topologyMetadata.wakeupThreads(); - processStreamThread(thread -> { + numStreamThreads = processStreamThread(thread -> { try { if (!thread.isRunning()) { + log.debug("Shutdown {} complete", thread.getName()); + thread.join(); } } catch (final InterruptedException ex) { + log.warn("Shutdown {} interrupted", thread.getName()); + Thread.currentThread().interrupt(); } }); + log.info("Shutdown {} stream threads complete", numStreamThreads); + if (globalStreamThread != null) { + log.info("Shutting down the global stream threads"); + globalStreamThread.shutdown(); } @@ -1360,9 +1373,13 @@ public class KafkaStreams implements AutoCloseable { try { globalStreamThread.join(); } catch (final InterruptedException e) { + log.warn("Shutdown the global stream thread interrupted"); + Thread.currentThread().interrupt(); } globalStreamThread = null; + + log.info("Shutdown global stream threads complete"); } stateDirectory.close(); @@ -1375,7 +1392,7 @@ public class KafkaStreams implements AutoCloseable { } else { setState(State.ERROR); } - }, "kafka-streams-close-thread"); + }, clientId + "-CloseThread"); } private boolean close(final long timeoutMs) { @@ -1624,9 +1641,11 @@ public class KafkaStreams implements AutoCloseable { * threads lock when looping threads. * @param consumer handler */ - protected void processStreamThread(final Consumer<StreamThread> consumer) { + protected int processStreamThread(final Consumer<StreamThread> consumer) { final List<StreamThread> copy = new ArrayList<>(threads); for (final StreamThread thread : copy) consumer.accept(thread); + + return copy.size(); } /**