wcarlson5 commented on code in PR #14922: URL: https://github.com/apache/kafka/pull/14922#discussion_r1418054625
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1802,7 +1805,7 @@ protected int processStreamThread(final Consumer<StreamThread> consumer) { * @throws TimeoutException Indicates that a request timed out. * @throws StreamsException For any other error that might occur. */ - public ClientInstanceIds clientInstanceIds(final Duration timeout) { + public synchronized ClientInstanceIds clientInstanceIds(final Duration timeout) { Review Comment: I think this should take care of most of the threading issues, but it does leave it easy to introduce bugs in the future ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1477,6 +1528,27 @@ public Object getStateLock() { return stateLock; } + public Map<String, KafkaFuture<Uuid>> consumerClientInstanceIds(final Duration timeout) { Review Comment: Can we add a comment here that this isn't thread safe? Just for the next person who tries to use it an introduces a nasty race condition. The fact that it returns futures might make people think it is ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -713,6 +727,48 @@ boolean runLoop() { return true; } + // visible for testing + void maybeGetClientInstanceIds() { Review Comment: That is actually okay as each time the call is made it overwrites `mainConsumerInstanceIdFuture` anyways so there is only ever one future to complete. Not that this isn't an issue but with the caller being synchronized it won't be for this feature -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org