wcarlson5 commented on code in PR #14864: URL: https://github.com/apache/kafka/pull/14864#discussion_r1412696799
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -713,6 +735,97 @@ boolean runLoop() { return true; } + // visible for testing + void maybeGetClientInstanceIds() { + // we pass in a timeout of zero into each `clientInstanceId()` call + // to just trigger the "get instance id" background RPC; + // we don't want to block the stream thread that can do useful work in the meantime Review Comment: good comments :) ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1477,6 +1590,67 @@ public Object getStateLock() { return stateLock; } + public Map<String, KafkaFuture<Uuid>> consumerClientInstanceIds(final Duration timeout) { + boolean setDeadline = false; + + final Map<String, KafkaFuture<Uuid>> result = new HashMap<>(); + + KafkaFutureImpl<Uuid> future = new KafkaFutureImpl<>(); + if (mainConsumerClientInstanceId != null) { + future.complete(mainConsumerClientInstanceId); + } else { + mainConsumerInstanceIdFuture = future; + setDeadline = true; + } + result.put(getName() + "-consumer", future); + + future = new KafkaFutureImpl<>(); + if (restoreConsumerClientInstanceId != null) { + future.complete(restoreConsumerClientInstanceId); + } else { + restoreConsumerInstanceIdFuture = future; + setDeadline = true; + } + result.put(getName() + "-restore-consumer", future); + + if (setDeadline) { + fetchDeadline = time.milliseconds() + timeout.toMillis(); + } + + return result; + } + + public KafkaFuture<Map<String, KafkaFuture<Uuid>>> producersClientInstanceIds(final Duration timeout) { + final KafkaFutureImpl<Map<String, KafkaFuture<Uuid>>> result = new KafkaFutureImpl<>(); + + if (processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) { +// for (final TaskId taskId : taskManager.activeTaskIds()) { +// future = new KafkaFutureImpl<>(); +// if (taskProducersClientInstanceIds.get(taskId) != null) { +// future.complete(taskProducersClientInstanceIds.get(taskId)); +// } else { +// taskProducersInstanceIdsFuture.put(taskId, future); +// setDeadline = true; +// } +// result.put(getName() + "-" + taskId + "-producer", future); +// }; + } else { + final KafkaFutureImpl<Uuid> producerFuture = new KafkaFutureImpl<>(); + if (threadProducerClientInstanceId != null) { + producerFuture.complete(threadProducerClientInstanceId); + } else { + threadProducerInstanceIdFuture = producerFuture; + if (fetchDeadline == -1) { + fetchDeadline = time.milliseconds() + timeout.toMillis(); + } + } + + result.complete(Collections.singletonMap(getName() + "-producer", producerFuture)); Review Comment: Could we be lazy and just treat any unavailable tasks as if they were set up with telemetry is disabled on the client itself? Not ideal but should keep things simple. Otherwise we might timeout all the time with restoring tasks ########## streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.streams.ClientInstanceIds; + +import java.util.HashMap; +import java.util.Map; + +public class ClientInstanceIdsImpl implements ClientInstanceIds { + private final Map<String, Uuid> consumerInstanceIds = new HashMap<>(); + private final Map<String, Uuid> producerInstanceIds = new HashMap<>(); + private Uuid adminInstanceId; + + public void addConsumerInstanceId(final String key, final Uuid instanceId) { + consumerInstanceIds.put(key, instanceId); + } + + public void addProducerInstanceId(final String key, final Uuid instanceId) { + producerInstanceIds.put(key, instanceId); + } + + public void setAdminInstanceId(final Uuid instanceId) { + adminInstanceId = instanceId; + } + + @Override + public Uuid adminInstanceId() { + return adminInstanceId; + } + + @Override + public Map<String, Uuid> consumerInstanceIds() { + return consumerInstanceIds; Review Comment: It should be fine. but if you want to be a stickler you could return an immutable copy ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1477,6 +1590,67 @@ public Object getStateLock() { return stateLock; } + public Map<String, KafkaFuture<Uuid>> consumerClientInstanceIds(final Duration timeout) { Review Comment: I have the same thread safety concerns here as I do with the global thread. Only they are even more complicated... :') ########## streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.streams.ClientInstanceIds; + +import java.util.HashMap; +import java.util.Map; + +public class ClientInstanceIdsImpl implements ClientInstanceIds { + private final Map<String, Uuid> consumerInstanceIds = new HashMap<>(); + private final Map<String, Uuid> producerInstanceIds = new HashMap<>(); + private Uuid adminInstanceId; + + public void addConsumerInstanceId(final String key, final Uuid instanceId) { + consumerInstanceIds.put(key, instanceId); + } + + public void addProducerInstanceId(final String key, final Uuid instanceId) { + producerInstanceIds.put(key, instanceId); + } + + public void setAdminInstanceId(final Uuid instanceId) { + adminInstanceId = instanceId; + } + + @Override + public Uuid adminInstanceId() { + if (adminInstanceId == null) { Review Comment: I'm in favor of leaving it as you have it. Makes the most sense to me ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ########## @@ -310,6 +317,30 @@ public void run() { cache.resize(size); } stateConsumer.pollAndUpdate(); + + if (fetchDeadline != -1) { + if (fetchDeadline >= time.milliseconds()) { + try { + // we pass in a timeout of zero, to just trigger the "get instance id" background RPC, + // we don't want to block the global thread that can do useful work in the meantime + globalConsumerClientInstanceId = globalConsumer.clientInstanceId(Duration.ZERO); + clientInstanceIdFuture.complete(globalConsumerClientInstanceId); + fetchDeadline = -1; + } catch (final IllegalStateException disabledError) { + clientInstanceIdFuture.complete(null); + fetchDeadline = -1; + } catch (final TimeoutException swallow) { Review Comment: I'm a bit concerned if multiple calls to `KafkaStreams#clientInstanceIds()` happen in a row within the time out. I think there is a chance of the earlier one left hanging depending on on when the fetchDeadline is reset. Actually, since the `clientInstanceIdFuture` is overwritten I think it can't be completed if a second call is made before the first future finishes. I think we might need a set of futures that are all compeleted then cleared here ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1791,6 +1794,74 @@ protected int processStreamThread(final Consumer<StreamThread> consumer) { return copy.size(); } + /** + * Returns the internal clients' assigned {@code client instance ids}. + * + * @return the internal clients' assigned instance ids used for metrics collection. + * + * @throws IllegalStateException If {@code KafkaStreams} is not running. + * @throws TimeoutException Indicates that a request timed out. + */ + public ClientInstanceIds clientInstanceIds(final Duration timeout) { + if (state().hasNotStarted()) { + throw new IllegalStateException("KafkaStreams has not been started, you can retry after calling start()."); + } + if (state().isShuttingDown() || state.hasCompletedShutdown()) { + throw new IllegalStateException("KafkaStreams has been stopped (" + state + ")."); + } + + final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl(); + + final Map<String, KafkaFuture<Uuid>> streamThreadFutures = new HashMap<>(); + for (final StreamThread streamThread : threads) { + streamThreadFutures.putAll(streamThread.clientInstanceIds(timeout)); + } + + KafkaFuture<Uuid> globalThreadFuture = null; + if (globalStreamThread != null) { + globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout); + } + + try { + clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout)); + } catch (final TimeoutException timeoutException) { + log.warn("Could not get admin client-instance-id due to timeout."); + } + + for (final Map.Entry<String, KafkaFuture<Uuid>> streamThreadFuture : streamThreadFutures.entrySet()) { + try { + clientInstanceIds.addConsumerInstanceId( + streamThreadFuture.getKey(), + streamThreadFuture.getValue().get() + ); + } catch (final ExecutionException exception) { + if (exception.getCause() instanceof TimeoutException) { + log.warn("Could not get global consumer client-instance-id due to timeout."); + } else { + log.error("Could not get global consumer client-instance-id", exception); + } + } catch (final InterruptedException error) { + log.error("Could not get global consumer client-instance-id", error); + } + } + + if (globalThreadFuture != null) { + try { + clientInstanceIds.addConsumerInstanceId(globalStreamThread.getName(), globalThreadFuture.get()); + } catch (final ExecutionException exception) { + if (exception.getCause() instanceof TimeoutException) { + log.warn("Could not get global consumer client-instance-id due to timeout."); + } else { + log.error("Could not get global consumer client-instance-id", exception); + } + } catch (final InterruptedException error) { + log.error("Could not get global consumer client-instance-id", error); + } + } + + return clientInstanceIds; Review Comment: I would agree using `KafkaFuture.allOf(...).get(duration)` if possible. It should make the logic easier to read as well as having a more consistent timeout for all futures. I don't think the error handling will be any worse as we still throw on the first exception anyways and `allOf` should do the same ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java: ########## @@ -136,8 +135,8 @@ StreamsProducer streamsProducerForTask(final TaskId taskId) { } StreamsProducer threadProducer() { - if (processingMode != EXACTLY_ONCE_V2) { - throw new IllegalStateException("Expected EXACTLY_ONCE_V2 to be enabled, but the processing mode was " + processingMode); + if (processingMode == EXACTLY_ONCE_ALPHA) { Review Comment: I'm not sure I understand why this didn't always include ALOS. To make sure this could always return the producer for ALOS we just never needed it to before and nothing changed in this PR to let it work with ALOS? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org