AndrewJSchofield commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409185585
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final
Consumer<StreamThread> consumer) {
return copy.size();
}
+ /**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+ public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+ if (state().hasNotStarted()) {
+ throw new IllegalStateException("KafkaStreams has not been
started, you can retry after calling start().");
+ }
+ if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+ throw new IllegalStateException("KafkaStreams has been stopped ("
+ state + ").");
+ }
+
+ final ClientInstanceIdsImpl clientInstanceIds = new
ClientInstanceIdsImpl();
+
+ KafkaFuture<Uuid> globalThreadFuture = null;
+ if (globalStreamThread != null) {
+ globalThreadFuture =
globalStreamThread.globalConsumerInstanceId(timeout);
+ }
+
+ try {
+
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+ } catch (final TimeoutException timeoutException) {
+ log.warn("Could not get admin client-instance-id due to timeout.");
Review Comment:
I'd use `client instance id` personally.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -310,6 +317,25 @@ public void run() {
cache.resize(size);
}
stateConsumer.pollAndUpdate();
+
+ if (fetchDeadline != -1) {
+ if (fetchDeadline > time.milliseconds()) {
+ try {
+ globalConsumerClientInstanceId =
globalConsumer.clientInstanceId(Duration.ZERO);
+
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+ fetchDeadline = -1;
+ } catch (final TimeoutException swallow) {
+ // swallow
+ } catch (final Exception error) {
+
clientInstanceIdFuture.completeExceptionally(error);
+ fetchDeadline = -1;
Review Comment:
This resets essentially so that it could in principle try again in future.
Just an observation again. Maybe that's what you want to do.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java:
##########
@@ -296,6 +302,63 @@ public Set<TopicPartition> partitions() {
assertFalse(new File(baseDirectoryName + File.separator + "testAppId"
+ File.separator + "global").exists());
}
+ @Test
+ public void shouldGetGlobalConsumerClientInstanceId() throws Exception {
+ initializeConsumer();
+ startAndSwallowError();
+
+ final Uuid instanceId = Uuid.randomUuid();
+ mockConsumer.setClientInstanceId(instanceId, Duration.ofMillis(1L));
+
+ try {
+ final KafkaFuture<Uuid> future =
globalStreamThread.globalConsumerInstanceId(Duration.ofMillis(2L));
+ final Uuid result = future.get();
+
+ assertThat(result, equalTo(instanceId));
+ } finally {
+ globalStreamThread.shutdown();
+ globalStreamThread.join();
+
Review Comment:
Extraneous blank line :)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -454,4 +480,19 @@ public void shutdown() {
public Map<MetricName, Metric> consumerMetrics() {
return Collections.unmodifiableMap(globalConsumer.metrics());
}
+
+ public KafkaFuture<Uuid> globalConsumerInstanceId(final Duration timeout) {
Review Comment:
What are the threading rules here? Might we end up with the
`clientInstanceId` and `fetchDeadline` being changed surprisingly by multiple
calls to this method.
##########
streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.streams.ClientInstanceIds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClientInstanceIdsImpl implements ClientInstanceIds {
+ private final Map<String, Uuid> consumerInstanceIds = new HashMap<>();
+ private final Map<String, Uuid> producerInstanceIds = new HashMap<>();
+ private Uuid adminInstanceId;
+
+ public void addConsumerInstanceId(final String key, final Uuid instanceId)
{
+ consumerInstanceIds.put(key, instanceId);
+ }
+
+ public void addProducerInstanceId(final String key, final Uuid instanceId)
{
+ producerInstanceIds.put(key, instanceId);
+ }
+
+ public void setAdminInstanceId(final Uuid instanceId) {
+ adminInstanceId = instanceId;
+ }
+
+ @Override
+ public Uuid adminInstanceId() {
+ return adminInstanceId;
+ }
+
+ @Override
+ public Map<String, Uuid> consumerInstanceIds() {
+ return consumerInstanceIds;
Review Comment:
Just an observation. You are returning a reference to the internal hashmap
from a public API.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -454,4 +480,19 @@ public void shutdown() {
public Map<MetricName, Metric> consumerMetrics() {
return Collections.unmodifiableMap(globalConsumer.metrics());
}
+
+ public KafkaFuture<Uuid> globalConsumerInstanceId(final Duration timeout) {
+ if (globalConsumerClientInstanceId != null) {
+ final KafkaFutureImpl<Uuid> success = new KafkaFutureImpl<>();
+ success.complete(globalConsumerClientInstanceId);
+ return success;
+ }
+
+ // need to set `clientInstancIdFuture` before `fetchDeadline`
Review Comment:
Typo `clientInstancIdFuture`.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final
Consumer<StreamThread> consumer) {
return copy.size();
}
+ /**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+ public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+ if (state().hasNotStarted()) {
+ throw new IllegalStateException("KafkaStreams has not been
started, you can retry after calling start().");
+ }
+ if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+ throw new IllegalStateException("KafkaStreams has been stopped ("
+ state + ").");
+ }
+
+ final ClientInstanceIdsImpl clientInstanceIds = new
ClientInstanceIdsImpl();
+
+ KafkaFuture<Uuid> globalThreadFuture = null;
+ if (globalStreamThread != null) {
+ globalThreadFuture =
globalStreamThread.globalConsumerInstanceId(timeout);
Review Comment:
You are applying the same Duration to all of the calls, but the caller would
expect the cumulative duration not to exceed the parameter supplied.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -310,6 +317,25 @@ public void run() {
cache.resize(size);
}
stateConsumer.pollAndUpdate();
+
+ if (fetchDeadline != -1) {
+ if (fetchDeadline > time.milliseconds()) {
+ try {
+ globalConsumerClientInstanceId =
globalConsumer.clientInstanceId(Duration.ZERO);
Review Comment:
That's a bit impatient.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -310,6 +317,25 @@ public void run() {
cache.resize(size);
}
stateConsumer.pollAndUpdate();
+
+ if (fetchDeadline != -1) {
+ if (fetchDeadline > time.milliseconds()) {
+ try {
+ globalConsumerClientInstanceId =
globalConsumer.clientInstanceId(Duration.ZERO);
+
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+ fetchDeadline = -1;
+ } catch (final TimeoutException swallow) {
+ // swallow
Review Comment:
So, you've got a TimeoutException but the future is not completed. Surely
there comes a point at which the future times out too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]