wcarlson5 commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1412696799


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -713,6 +735,97 @@ boolean runLoop() {
         return true;
     }
 
+    // visible for testing
+    void maybeGetClientInstanceIds() {
+        // we pass in a timeout of zero into each `clientInstanceId()` call
+        // to just trigger the "get instance id" background RPC;
+        // we don't want to block the stream thread that can do useful work in 
the meantime

Review Comment:
   good comments :)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1477,6 +1590,67 @@ public Object getStateLock() {
         return stateLock;
     }
 
+    public Map<String, KafkaFuture<Uuid>> consumerClientInstanceIds(final 
Duration timeout) {
+        boolean setDeadline = false;
+
+        final Map<String, KafkaFuture<Uuid>> result = new HashMap<>();
+
+        KafkaFutureImpl<Uuid> future = new KafkaFutureImpl<>();
+        if (mainConsumerClientInstanceId != null) {
+            future.complete(mainConsumerClientInstanceId);
+        } else {
+            mainConsumerInstanceIdFuture = future;
+            setDeadline = true;
+        }
+        result.put(getName() + "-consumer", future);
+
+        future = new KafkaFutureImpl<>();
+        if (restoreConsumerClientInstanceId != null) {
+            future.complete(restoreConsumerClientInstanceId);
+        } else {
+            restoreConsumerInstanceIdFuture = future;
+            setDeadline = true;
+        }
+        result.put(getName() + "-restore-consumer", future);
+
+        if (setDeadline) {
+            fetchDeadline = time.milliseconds() + timeout.toMillis();
+        }
+
+        return result;
+    }
+
+    public KafkaFuture<Map<String, KafkaFuture<Uuid>>> 
producersClientInstanceIds(final Duration timeout) {
+        final KafkaFutureImpl<Map<String, KafkaFuture<Uuid>>> result = new 
KafkaFutureImpl<>();
+
+        if 
(processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) {
+//            for (final TaskId taskId : taskManager.activeTaskIds()) {
+//                future = new KafkaFutureImpl<>();
+//                if (taskProducersClientInstanceIds.get(taskId) != null) {
+//                    
future.complete(taskProducersClientInstanceIds.get(taskId));
+//                } else {
+//                    taskProducersInstanceIdsFuture.put(taskId, future);
+//                    setDeadline = true;
+//                }
+//                result.put(getName() + "-" + taskId + "-producer", future);
+//            };
+        } else {
+            final KafkaFutureImpl<Uuid> producerFuture = new 
KafkaFutureImpl<>();
+            if (threadProducerClientInstanceId != null) {
+                producerFuture.complete(threadProducerClientInstanceId);
+            } else {
+                threadProducerInstanceIdFuture = producerFuture;
+                if (fetchDeadline == -1) {
+                    fetchDeadline = time.milliseconds() + timeout.toMillis();
+                }
+            }
+
+            result.complete(Collections.singletonMap(getName() + "-producer", 
producerFuture));

Review Comment:
   Could we be lazy and just treat any unavailable tasks as if they were set up 
with telemetry is disabled on the client itself?
   
   Not ideal but should keep things simple. Otherwise we might timeout all the 
time with restoring tasks



##########
streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.streams.ClientInstanceIds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClientInstanceIdsImpl implements ClientInstanceIds {
+    private final Map<String, Uuid> consumerInstanceIds = new HashMap<>();
+    private final Map<String, Uuid> producerInstanceIds = new HashMap<>();
+    private Uuid adminInstanceId;
+
+    public void addConsumerInstanceId(final String key, final Uuid instanceId) 
{
+        consumerInstanceIds.put(key, instanceId);
+    }
+
+    public void addProducerInstanceId(final String key, final Uuid instanceId) 
{
+        producerInstanceIds.put(key, instanceId);
+    }
+
+    public void setAdminInstanceId(final Uuid instanceId) {
+        adminInstanceId = instanceId;
+    }
+
+    @Override
+    public Uuid adminInstanceId() {
+        return adminInstanceId;
+    }
+
+    @Override
+    public Map<String, Uuid> consumerInstanceIds() {
+        return consumerInstanceIds;

Review Comment:
   It should be fine. but if you want to be a stickler you could return an 
immutable copy



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1477,6 +1590,67 @@ public Object getStateLock() {
         return stateLock;
     }
 
+    public Map<String, KafkaFuture<Uuid>> consumerClientInstanceIds(final 
Duration timeout) {

Review Comment:
   I have the same thread safety concerns here as I do with the global thread. 
Only they are even more complicated... :')



##########
streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.streams.ClientInstanceIds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClientInstanceIdsImpl implements ClientInstanceIds {
+    private final Map<String, Uuid> consumerInstanceIds = new HashMap<>();
+    private final Map<String, Uuid> producerInstanceIds = new HashMap<>();
+    private Uuid adminInstanceId;
+
+    public void addConsumerInstanceId(final String key, final Uuid instanceId) 
{
+        consumerInstanceIds.put(key, instanceId);
+    }
+
+    public void addProducerInstanceId(final String key, final Uuid instanceId) 
{
+        producerInstanceIds.put(key, instanceId);
+    }
+
+    public void setAdminInstanceId(final Uuid instanceId) {
+        adminInstanceId = instanceId;
+    }
+
+    @Override
+    public Uuid adminInstanceId() {
+        if (adminInstanceId == null) {

Review Comment:
   I'm in favor of leaving it as you have it. Makes the most sense to me



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -310,6 +317,30 @@ public void run() {
                     cache.resize(size);
                 }
                 stateConsumer.pollAndUpdate();
+
+                if (fetchDeadline != -1) {
+                    if (fetchDeadline >= time.milliseconds()) {
+                        try {
+                            // we pass in a timeout of zero, to just trigger 
the "get instance id" background RPC,
+                            // we don't want to block the global thread that 
can do useful work in the meantime
+                            globalConsumerClientInstanceId = 
globalConsumer.clientInstanceId(Duration.ZERO);
+                            
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+                            fetchDeadline = -1;
+                        } catch (final IllegalStateException disabledError) {
+                            clientInstanceIdFuture.complete(null);
+                            fetchDeadline = -1;
+                        } catch (final TimeoutException swallow) {

Review Comment:
   I'm a bit concerned if multiple calls to `KafkaStreams#clientInstanceIds()` 
happen in a row within the time out. I think there is a chance of the earlier 
one left hanging depending on on when the fetchDeadline is reset.
   Actually, since the `clientInstanceIdFuture` is overwritten I think it can't 
be completed if a second call is made before the first future finishes. 
   
   I think we might need a set of futures that are all compeleted then cleared 
here



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1794,74 @@ protected int processStreamThread(final 
Consumer<StreamThread> consumer) {
         return copy.size();
     }
 
+    /**
+     * Returns the internal clients' assigned {@code client instance ids}.
+     *
+     * @return the internal clients' assigned instance ids used for metrics 
collection.
+     *
+     * @throws IllegalStateException If {@code KafkaStreams} is not running.
+     * @throws TimeoutException Indicates that a request timed out.
+     */
+    public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+        if (state().hasNotStarted()) {
+            throw new IllegalStateException("KafkaStreams has not been 
started, you can retry after calling start().");
+        }
+        if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+            throw new IllegalStateException("KafkaStreams has been stopped (" 
+ state + ").");
+        }
+
+        final ClientInstanceIdsImpl clientInstanceIds = new 
ClientInstanceIdsImpl();
+
+        final Map<String, KafkaFuture<Uuid>> streamThreadFutures = new 
HashMap<>();
+        for (final StreamThread streamThread : threads) {
+            
streamThreadFutures.putAll(streamThread.clientInstanceIds(timeout));
+        }
+
+        KafkaFuture<Uuid> globalThreadFuture = null;
+        if (globalStreamThread != null) {
+            globalThreadFuture = 
globalStreamThread.globalConsumerInstanceId(timeout);
+        }
+
+        try {
+            
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+        } catch (final TimeoutException timeoutException) {
+            log.warn("Could not get admin client-instance-id due to timeout.");
+        }
+
+        for (final Map.Entry<String, KafkaFuture<Uuid>> streamThreadFuture : 
streamThreadFutures.entrySet()) {
+            try {
+                clientInstanceIds.addConsumerInstanceId(
+                    streamThreadFuture.getKey(),
+                    streamThreadFuture.getValue().get()
+                );
+            } catch (final ExecutionException exception) {
+                if (exception.getCause() instanceof TimeoutException) {
+                    log.warn("Could not get global consumer client-instance-id 
due to timeout.");
+                } else {
+                    log.error("Could not get global consumer 
client-instance-id", exception);
+                }
+            } catch (final InterruptedException error) {
+                log.error("Could not get global consumer client-instance-id", 
error);
+            }
+        }
+
+        if (globalThreadFuture != null) {
+            try {
+                
clientInstanceIds.addConsumerInstanceId(globalStreamThread.getName(), 
globalThreadFuture.get());
+            } catch (final ExecutionException exception) {
+                if (exception.getCause() instanceof TimeoutException) {
+                    log.warn("Could not get global consumer client-instance-id 
due to timeout.");
+                } else {
+                    log.error("Could not get global consumer 
client-instance-id", exception);
+                }
+            } catch (final InterruptedException error) {
+                log.error("Could not get global consumer client-instance-id", 
error);
+            }
+        }
+
+        return clientInstanceIds;

Review Comment:
   I would agree using `KafkaFuture.allOf(...).get(duration)` if possible. It 
should make the logic easier to read as well as having a more consistent 
timeout for all futures.
   
   I don't think the error handling will be any worse as we still throw on the 
first exception anyways and `allOf` should do the same



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##########
@@ -136,8 +135,8 @@ StreamsProducer streamsProducerForTask(final TaskId taskId) 
{
     }
 
     StreamsProducer threadProducer() {
-        if (processingMode != EXACTLY_ONCE_V2) {
-            throw new IllegalStateException("Expected EXACTLY_ONCE_V2 to be 
enabled, but the processing mode was " + processingMode);
+        if (processingMode == EXACTLY_ONCE_ALPHA) {

Review Comment:
   I'm not sure I understand why this didn't always include ALOS.
   
   To make sure this could always return the producer for ALOS we just never 
needed it to before and nothing changed in this PR to let it work with ALOS?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to