mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1409794850
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -310,6 +317,25 @@ public void run() {
cache.resize(size);
}
stateConsumer.pollAndUpdate();
+
+ if (fetchDeadline != -1) {
+ if (fetchDeadline > time.milliseconds()) {
+ try {
+ globalConsumerClientInstanceId =
globalConsumer.clientInstanceId(Duration.ZERO);
+
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+ fetchDeadline = -1;
+ } catch (final TimeoutException swallow) {
+ // swallow
+ } catch (final Exception error) {
+
clientInstanceIdFuture.completeExceptionally(error);
+ fetchDeadline = -1;
Review Comment:
Yes. If the user calls `KafkaStreams#clientInstanceIds()` again, we would
set a new fetch deadline -- if `fetchDeadline == -1` it means "nothing to be
done", ie, no call to `KafkaStreams#clientInstanceIds()` was one / is in-flight
waiting for completion.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -310,6 +317,25 @@ public void run() {
cache.resize(size);
}
stateConsumer.pollAndUpdate();
+
+ if (fetchDeadline != -1) {
+ if (fetchDeadline > time.milliseconds()) {
+ try {
+ globalConsumerClientInstanceId =
globalConsumer.clientInstanceId(Duration.ZERO);
+
clientInstanceIdFuture.complete(globalConsumerClientInstanceId);
+ fetchDeadline = -1;
+ } catch (final TimeoutException swallow) {
+ // swallow
Review Comment:
Yes, this happens in the `else` of `if (fetchDeadline > time.milliseconds())
{` (from above) further below.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final
Consumer<StreamThread> consumer) {
return copy.size();
}
+ /**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+ public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+ if (state().hasNotStarted()) {
+ throw new IllegalStateException("KafkaStreams has not been
started, you can retry after calling start().");
+ }
+ if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+ throw new IllegalStateException("KafkaStreams has been stopped ("
+ state + ").");
+ }
+
+ final ClientInstanceIdsImpl clientInstanceIds = new
ClientInstanceIdsImpl();
+
+ KafkaFuture<Uuid> globalThreadFuture = null;
+ if (globalStreamThread != null) {
+ globalThreadFuture =
globalStreamThread.globalConsumerInstanceId(timeout);
Review Comment:
How strict we can obey the given `timeout` is somewhat tricky, given that we
need to call `clientInstanceId()` for each client we have. -- The idea was to
basically "fan-out" all these calls and to them in parallel (note that
`globalConsumerInstanceId` will return immediately and not block, but hand the
execution from the user thread to the `GlobalStreamThread`; that's why we
return a Future) -- thus it should be ok to provide the same timeout to each
call (as all of them are done in parallel)?
If you have any good suggestion how it could be done better, let me know.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -310,6 +317,25 @@ public void run() {
cache.resize(size);
}
stateConsumer.pollAndUpdate();
+
+ if (fetchDeadline != -1) {
+ if (fetchDeadline > time.milliseconds()) {
+ try {
+ globalConsumerClientInstanceId =
globalConsumer.clientInstanceId(Duration.ZERO);
Review Comment:
Yes, but this was intentionally. The `GlobalStreamThread` does this call "on
the side", and thus the idea is to just call it with no timeout to just trigger
the background RPC and not block the thread from doing its actually work at
all. -- There won't be a busy wait, because the global thread will do other
useful work in the meantime before it cycles back here, to check if the RPC
returned.
Thoughts?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -454,4 +480,19 @@ public void shutdown() {
public Map<MetricName, Metric> consumerMetrics() {
return Collections.unmodifiableMap(globalConsumer.metrics());
}
+
+ public KafkaFuture<Uuid> globalConsumerInstanceId(final Duration timeout) {
Review Comment:
The users thread would call `KafkaStreams#clientInstanceIds(...)` which is
the only place calling this method. -- we could make
`KafkaStreams#clientInstanceIds(...)` synchronized? In the end, `KafkaStreams`
is supposed to be thread safe.
##########
streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.streams.ClientInstanceIds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClientInstanceIdsImpl implements ClientInstanceIds {
+ private final Map<String, Uuid> consumerInstanceIds = new HashMap<>();
+ private final Map<String, Uuid> producerInstanceIds = new HashMap<>();
+ private Uuid adminInstanceId;
+
+ public void addConsumerInstanceId(final String key, final Uuid instanceId)
{
+ consumerInstanceIds.put(key, instanceId);
+ }
+
+ public void addProducerInstanceId(final String key, final Uuid instanceId)
{
+ producerInstanceIds.put(key, instanceId);
+ }
+
+ public void setAdminInstanceId(final Uuid instanceId) {
+ adminInstanceId = instanceId;
+ }
+
+ @Override
+ public Uuid adminInstanceId() {
+ return adminInstanceId;
+ }
+
+ @Override
+ public Map<String, Uuid> consumerInstanceIds() {
+ return consumerInstanceIds;
Review Comment:
Yes, given that we create a `new ClientInstanceIdsImpl` on each call to
`KafakStreams#clientInstanceIds` I though it should be ok? Thoughts?
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1794,52 @@ protected int processStreamThread(final
Consumer<StreamThread> consumer) {
return copy.size();
}
+ /**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return the internal clients' assigned instance ids used for metrics
collection.
+ *
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ */
+ public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+ if (state().hasNotStarted()) {
+ throw new IllegalStateException("KafkaStreams has not been
started, you can retry after calling start().");
+ }
+ if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+ throw new IllegalStateException("KafkaStreams has been stopped ("
+ state + ").");
+ }
+
+ final ClientInstanceIdsImpl clientInstanceIds = new
ClientInstanceIdsImpl();
+
+ KafkaFuture<Uuid> globalThreadFuture = null;
+ if (globalStreamThread != null) {
+ globalThreadFuture =
globalStreamThread.globalConsumerInstanceId(timeout);
+ }
+
+ try {
+
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+ } catch (final TimeoutException timeoutException) {
+ log.warn("Could not get admin client-instance-id due to timeout.");
Review Comment:
Let hear what other think. Not married to it. Want to ensure that it's not
read as `admin-client instance-id`. Maybe does not matter too much.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]