mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1412009752


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1794,74 @@ protected int processStreamThread(final 
Consumer<StreamThread> consumer) {
         return copy.size();
     }
 
+    /**
+     * Returns the internal clients' assigned {@code client instance ids}.
+     *
+     * @return the internal clients' assigned instance ids used for metrics 
collection.
+     *
+     * @throws IllegalStateException If {@code KafkaStreams} is not running.
+     * @throws TimeoutException Indicates that a request timed out.
+     */
+    public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+        if (state().hasNotStarted()) {
+            throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+        }
+
+        final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+        final Map<String, KafkaFuture<Uuid>> streamThreadFutures = new 
HashMap<>();
+        for (final StreamThread streamThread : threads) {
+            
streamThreadFutures.putAll(streamThread.clientInstanceIds(timeout));
+        }
+
+        KafkaFuture<Uuid> globalThreadFuture = null;
+        if (globalStreamThread != null) {
+            globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);
+        }
+
+        try {
+            
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+        } catch (final TimeoutException timeoutException) {
+            log.warn("Could not get admin client-instance-id due to timeout.");
+        }
+
+        for (final Map.Entry<String, KafkaFuture<Uuid>> streamThreadFuture : 
streamThreadFutures.entrySet()) {
+            try {
+                clientInstanceIds.addConsumerInstanceId(
+                    streamThreadFuture.getKey(),
+                    streamThreadFuture.getValue().get()
+                );
+            } catch (final ExecutionException exception) {
+                if (exception.getCause() instanceof TimeoutException) {
+                    log.warn("Could not get global consumer client-instance-id 
due to timeout.");
+                } else {
+                    log.error("Could not get global consumer 
client-instance-id", exception);
+                }
+            } catch (final InterruptedException error) {
+                log.error("Could not get global consumer client-instance-id", 
error);
+            }
+        }
+
+        if (globalThreadFuture != null) {
+            try {
+                
clientInstanceIds.addConsumerInstanceId(globalStreamThread.getName(), 
globalThreadFuture.get());
+            } catch (final ExecutionException exception) {
+                if (exception.getCause() instanceof TimeoutException) {
+                    log.warn("Could not get global consumer client-instance-id 
due to timeout.");
+                } else {
+                    log.error("Could not get global consumer 
client-instance-id", exception);
+                }
+            } catch (final InterruptedException error) {
+                log.error("Could not get global consumer client-instance-id", 
error);
+            }
+        }
+
+        return clientInstanceIds;

Review Comment:
   Does it buy us much? -- I actually would like to prefer somewhat better 
error handing, and better error messages. Did a larger rewrite of this. Let me 
know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to