mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409794850


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -310,6 +317,25 @@ public void run() {
                     cache.resize(size);
                 }
                 stateConsumer.pollAndUpdate();
+
+                if (fetchDeadline != -1) {
+                    if (fetchDeadline > time.milliseconds()) {
+                        try {
+                            globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);
+                            
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+                            fetchDeadline = -1;
+                        } catch (final TimeoutException swallow) {
+                            // swallow
+                        } catch (final Exception error) {
+                            
clientInstanceIdFuture.completeExceptionally(error);
+                            fetchDeadline = -1;

Review Comment:
   Yes. If the user calls `KafkaStreams#clientInstanceIds()` again, we would 
set a new fetch deadline -- if `fetchDeadline == -1` it means "nothing to be 
done", ie, no call to `KafkaStreams#clientInstanceIds()` was one / is in-flight 
waiting for completion.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -310,6 +317,25 @@ public void run() {
                     cache.resize(size);
                 }
                 stateConsumer.pollAndUpdate();
+
+                if (fetchDeadline != -1) {
+                    if (fetchDeadline > time.milliseconds()) {
+                        try {
+                            globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);
+                            
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+                            fetchDeadline = -1;
+                        } catch (final TimeoutException swallow) {
+                            // swallow

Review Comment:
   Yes, this happens in the `else` of `if (fetchDeadline > time.milliseconds()) 
{` (from above) further below.



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final 
Consumer<StreamThread> consumer) {
         return copy.size();
     }
 
+    /**
+     * Returns the internal clients' assigned {@code client instance ids}.
+     *
+     * @return the internal clients' assigned instance ids used for metrics 
collection.
+     *
+     * @throws IllegalStateException If {@code KafkaStreams} is not running.
+     * @throws TimeoutException Indicates that a request timed out.
+     */
+    public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+        if (state().hasNotStarted()) {
+            throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+        }
+
+        final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+        KafkaFuture<Uuid> globalThreadFuture = null;
+        if (globalStreamThread != null) {
+            globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);

Review Comment:
   How strict we can obey the given `timeout` is somewhat tricky, given that we 
need to call `clientInstanceId()` for each client we have. -- The idea was to 
basically "fan-out" all these calls and to them in parallel (note that 
`globalConsumerInstanceId` will return immediately and not block, but hand the 
execution from the user thread to the `GlobalStreamThread`; that's why we 
return a Future) -- thus it should be ok to provide the same timeout to each 
call (as all of them are done in parallel)?
   
   If you have any good suggestion how it could be done better, let me know.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -310,6 +317,25 @@ public void run() {
                     cache.resize(size);
                 }
                 stateConsumer.pollAndUpdate();
+
+                if (fetchDeadline != -1) {
+                    if (fetchDeadline > time.milliseconds()) {
+                        try {
+                            globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);

Review Comment:
   Yes, but this was intentionally. The `GlobalStreamThread` does this call "on 
the side", and thus the idea is to just call it with no timeout to just trigger 
the background RPC and not block the thread from doing its actually work at 
all. -- There won't be a busy wait, because the global thread will do other 
useful work in the meantime before it cycles back here, to check if the RPC 
returned.
   
   Thoughts?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -454,4 +480,19 @@ public void shutdown() {
     public Map<MetricName, Metric> consumerMetrics() {
         return Collections.unmodifiableMap(globalConsumer.metrics());
     }
+
+    public KafkaFuture<Uuid> globalConsumerInstanceId(final Duration timeout) {

Review Comment:
   The users thread would call `KafkaStreams#clientInstanceIds(...)` which is 
the only place calling this method. -- we could make 
`KafkaStreams#clientInstanceIds(...)` synchronized? In the end, `KafkaStreams` 
is supposed to be thread safe.



##########
streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.streams.ClientInstanceIds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClientInstanceIdsImpl implements ClientInstanceIds {
+    private final Map<String, Uuid> consumerInstanceIds = new HashMap<>();
+    private final Map<String, Uuid> producerInstanceIds = new HashMap<>();
+    private Uuid adminInstanceId;
+
+    public void addConsumerInstanceId(final String key, final Uuid instanceId) 
{
+        consumerInstanceIds.put(key, instanceId);
+    }
+
+    public void addProducerInstanceId(final String key, final Uuid instanceId) 
{
+        producerInstanceIds.put(key, instanceId);
+    }
+
+    public void setAdminInstanceId(final Uuid instanceId) {
+        adminInstanceId = instanceId;
+    }
+
+    @Override
+    public Uuid adminInstanceId() {
+        return adminInstanceId;
+    }
+
+    @Override
+    public Map<String, Uuid> consumerInstanceIds() {
+        return consumerInstanceIds;

Review Comment:
   Yes, given that we create a `new ClientInstanceIdsImpl` on each call to 
`KafakStreams#clientInstanceIds` I though it should be ok? Thoughts?



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final 
Consumer<StreamThread> consumer) {
         return copy.size();
     }
 
+    /**
+     * Returns the internal clients' assigned {@code client instance ids}.
+     *
+     * @return the internal clients' assigned instance ids used for metrics 
collection.
+     *
+     * @throws IllegalStateException If {@code KafkaStreams} is not running.
+     * @throws TimeoutException Indicates that a request timed out.
+     */
+    public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+        if (state().hasNotStarted()) {
+            throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+        }
+
+        final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+        KafkaFuture<Uuid> globalThreadFuture = null;
+        if (globalStreamThread != null) {
+            globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);
+        }
+
+        try {
+            
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+        } catch (final TimeoutException timeoutException) {
+            log.warn("Could not get admin client-instance-id due to timeout.");

Review Comment:
   Let hear what other think. Not married to it. Want to ensure that it's not 
read as `admin-client instance-id`. Maybe does not matter too much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to