wcarlson5 commented on code in PR #14922:
URL: https://github.com/apache/kafka/pull/14922#discussion_r1418054625


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1802,7 +1805,7 @@ protected int processStreamThread(final 
Consumer<StreamThread> consumer) {
      * @throws TimeoutException Indicates that a request timed out.
      * @throws StreamsException For any other error that might occur.
      */
-    public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+    public synchronized ClientInstanceIds clientInstanceIds(final Duration 
timeout) {

Review Comment:
   I think this should take care of most of the threading issues, but it does 
leave it easy to introduce bugs in the future



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1477,6 +1528,27 @@ public Object getStateLock() {
         return stateLock;
     }
 
+    public Map<String, KafkaFuture<Uuid>> consumerClientInstanceIds(final 
Duration timeout) {

Review Comment:
   Can we add a comment here that this isn't thread safe? Just for the next 
person who tries to use it an introduces a nasty race condition. The fact that 
it returns futures might make people think it is



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -713,6 +727,48 @@ boolean runLoop() {
         return true;
     }
 
+    // visible for testing
+    void maybeGetClientInstanceIds() {

Review Comment:
   That is actually okay as each time the call is made it overwrites 
`mainConsumerInstanceIdFuture` anyways so there is only ever one future to 
complete.
   
   Not that this isn't an issue but with the caller being synchronized it won't 
be for this feature



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to