mjsax commented on code in PR #17521: URL: https://github.com/apache/kafka/pull/17521#discussion_r1804091406
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1907,65 +1905,30 @@ public synchronized ClientInstanceIds clientInstanceIds(final Duration timeout) // (3) collect client instance ids from threads - // (3a) collect consumers from StreamsThread - for (final Map.Entry<String, KafkaFuture<Uuid>> consumerFuture : consumerFutures.entrySet()) { + // (3a) collect consumers and producer from StreamsThread + for (final Map.Entry<String, KafkaFuture<Uuid>> clientFuture : clientFutures.entrySet()) { final Uuid instanceId = getOrThrowException( - consumerFuture.getValue(), + clientFuture.getValue(), remainingTime.remainingMs(), () -> String.format( - "Could not retrieve consumer instance id for %s.", - consumerFuture.getKey() + "Could not retrieve consumer/producer instance id for %s.", + clientFuture.getKey() ) ); remainingTime.update(time.milliseconds()); // could be `null` if telemetry is disabled on the consumer itself if (instanceId != null) { clientInstanceIds.addConsumerInstanceId( - consumerFuture.getKey(), + clientFuture.getKey(), instanceId ); } else { - log.debug(String.format("Telemetry is disabled for %s.", consumerFuture.getKey())); + log.debug(String.format("Telemetry is disabled for %s.", clientFuture.getKey())); } } - // (3b) collect producers from StreamsThread Review Comment: This case is now covered with 3(a) step -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org