mjsax commented on code in PR #14864:
URL: https://github.com/apache/kafka/pull/14864#discussion_r1412011082
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1795,146 @@ protected int processStreamThread(final
Consumer<StreamThread> consumer) {
return copy.size();
}
+ /**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return The internal clients' assigned instance ids used for metrics
collection.
+ *
+ * @throws IllegalArgumentException If {@code timeout} is negative.
Review Comment:
Added this (in alignment to `consumer/producer/admin$clientInstanceId()` --
KIP needs to be updated accordingly
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1795,146 @@ protected int processStreamThread(final
Consumer<StreamThread> consumer) {
return copy.size();
}
+ /**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return The internal clients' assigned instance ids used for metrics
collection.
+ *
+ * @throws IllegalArgumentException If {@code timeout} is negative.
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ * @throws StreamsException For any other error that might occur.
+ */
+ public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+ if (timeout.isNegative()) {
+ throw new IllegalArgumentException("The timeout cannot be
negative.");
+ }
+ if (state().hasNotStarted()) {
+ throw new IllegalStateException("KafkaStreams has not been
started, you can retry after calling start().");
+ }
+ if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+ throw new IllegalStateException("KafkaStreams has been stopped ("
+ state + ").");
+ }
+
+ final ClientInstanceIdsImpl clientInstanceIds = new
ClientInstanceIdsImpl();
+
+ // (1) fan-out calls to threads
+
+ // StreamThread for main/restore consumers and producer(s)
+ final Map<String, KafkaFuture<Uuid>> consumerFutures = new HashMap<>();
+ final Map<String, KafkaFuture<Map<String, KafkaFuture<Uuid>>>>
producerFutures = new HashMap<>();
+ for (final StreamThread streamThread : threads) {
+
consumerFutures.putAll(streamThread.consumerClientInstanceIds(timeout));
+ producerFutures.put(streamThread.getName(),
streamThread.producersClientInstanceIds(timeout));
+ }
+ // GlobalThread
+ KafkaFuture<Uuid> globalThreadFuture = null;
+ if (globalStreamThread != null) {
+ globalThreadFuture =
globalStreamThread.globalConsumerInstanceId(timeout);
+ }
+
+ // (2) get admin client instance id in a blocking fashion, while
Stream/GlobalThreads work in parallel
+ try {
+
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+ } catch (final IllegalStateException telemetryDisabledError) {
Review Comment:
This is new, base on other PRs from KIP-714 --
`adminClient.clientInstanceId` throw is telemetry is disable -- for this case,
we might not want to throw, but return a "partial" result...
##########
streams/src/main/java/org/apache/kafka/streams/internals/ClientInstanceIdsImpl.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.streams.ClientInstanceIds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClientInstanceIdsImpl implements ClientInstanceIds {
+ private final Map<String, Uuid> consumerInstanceIds = new HashMap<>();
+ private final Map<String, Uuid> producerInstanceIds = new HashMap<>();
+ private Uuid adminInstanceId;
+
+ public void addConsumerInstanceId(final String key, final Uuid instanceId)
{
+ consumerInstanceIds.put(key, instanceId);
+ }
+
+ public void addProducerInstanceId(final String key, final Uuid instanceId)
{
+ producerInstanceIds.put(key, instanceId);
+ }
+
+ public void setAdminInstanceId(final Uuid instanceId) {
+ adminInstanceId = instanceId;
+ }
+
+ @Override
+ public Uuid adminInstanceId() {
+ if (adminInstanceId == null) {
Review Comment:
This part is new and we might need to discuss... We can either throw as the
code does right now, return `null`, or change to optional.
Or, we merge everything together is a single `Map<>` and give up the
splitting of different client completely (originally it was proposed to also
split consumer and restore consumer, but during KIP discussion a merge of both
was preferred).
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1795,146 @@ protected int processStreamThread(final
Consumer<StreamThread> consumer) {
return copy.size();
}
+ /**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return The internal clients' assigned instance ids used for metrics
collection.
+ *
+ * @throws IllegalArgumentException If {@code timeout} is negative.
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ * @throws StreamsException For any other error that might occur.
+ */
+ public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+ if (timeout.isNegative()) {
+ throw new IllegalArgumentException("The timeout cannot be
negative.");
+ }
+ if (state().hasNotStarted()) {
+ throw new IllegalStateException("KafkaStreams has not been
started, you can retry after calling start().");
+ }
+ if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+ throw new IllegalStateException("KafkaStreams has been stopped ("
+ state + ").");
+ }
+
+ final ClientInstanceIdsImpl clientInstanceIds = new
ClientInstanceIdsImpl();
+
+ // (1) fan-out calls to threads
+
+ // StreamThread for main/restore consumers and producer(s)
+ final Map<String, KafkaFuture<Uuid>> consumerFutures = new HashMap<>();
+ final Map<String, KafkaFuture<Map<String, KafkaFuture<Uuid>>>>
producerFutures = new HashMap<>();
Review Comment:
For producers, it's more complex, because for EOSv1 we depend on tasks, and
thus on rebalances... EOSv1 is still not fully covered, but wanted to give a
heads up why it's different to consumers.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -310,6 +317,30 @@ public void run() {
cache.resize(size);
}
stateConsumer.pollAndUpdate();
+
+ if (fetchDeadline != -1) {
+ if (fetchDeadline >= time.milliseconds()) {
Review Comment:
minor change from `>` to `>=` what I believe is more correct (similar
elsewhere)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1477,6 +1590,67 @@ public Object getStateLock() {
return stateLock;
}
+ public Map<String, KafkaFuture<Uuid>> consumerClientInstanceIds(final
Duration timeout) {
+ boolean setDeadline = false;
+
+ final Map<String, KafkaFuture<Uuid>> result = new HashMap<>();
+
+ KafkaFutureImpl<Uuid> future = new KafkaFutureImpl<>();
+ if (mainConsumerClientInstanceId != null) {
+ future.complete(mainConsumerClientInstanceId);
+ } else {
+ mainConsumerInstanceIdFuture = future;
+ setDeadline = true;
+ }
+ result.put(getName() + "-consumer", future);
+
+ future = new KafkaFutureImpl<>();
+ if (restoreConsumerClientInstanceId != null) {
+ future.complete(restoreConsumerClientInstanceId);
+ } else {
+ restoreConsumerInstanceIdFuture = future;
+ setDeadline = true;
+ }
+ result.put(getName() + "-restore-consumer", future);
+
+ if (setDeadline) {
+ fetchDeadline = time.milliseconds() + timeout.toMillis();
+ }
+
+ return result;
+ }
+
+ public KafkaFuture<Map<String, KafkaFuture<Uuid>>>
producersClientInstanceIds(final Duration timeout) {
+ final KafkaFutureImpl<Map<String, KafkaFuture<Uuid>>> result = new
KafkaFutureImpl<>();
+
+ if
(processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) {
+// for (final TaskId taskId : taskManager.activeTaskIds()) {
+// future = new KafkaFutureImpl<>();
+// if (taskProducersClientInstanceIds.get(taskId) != null) {
+//
future.complete(taskProducersClientInstanceIds.get(taskId));
+// } else {
+// taskProducersInstanceIdsFuture.put(taskId, future);
+// setDeadline = true;
+// }
+// result.put(getName() + "-" + taskId + "-producer", future);
+// };
+ } else {
+ final KafkaFutureImpl<Uuid> producerFuture = new
KafkaFutureImpl<>();
+ if (threadProducerClientInstanceId != null) {
+ producerFuture.complete(threadProducerClientInstanceId);
+ } else {
+ threadProducerInstanceIdFuture = producerFuture;
+ if (fetchDeadline == -1) {
+ fetchDeadline = time.milliseconds() + timeout.toMillis();
+ }
+ }
+
+ result.complete(Collections.singletonMap(getName() + "-producer",
producerFuture));
Review Comment:
For thread-producer (ie, ALOS / EOSv2) we can complete the future we return
right away because we did create all nested futures already (completion of the
outer future means, all inner futures got added to the map and we haven only
one inner for the thread producer case).
For EOSv1 we need to forward the request to create the nested futures (one
per task) to the TaskManager, ie, we need the StreamThread to not just
complete, but actually first create the nested futures (to avoid race
conditions with rebalancing and task creating/removal) -- for this case the
outer future will not be complete here, but only after the TM did create all
task futures.
My intention is to take the created map as a snapshot, ie, if a rebalance
happens in between, we won't add new futures to the map (for newly assigned
tasks), and for removed tasks, we will cancel the future.
Not sure about tasks which are not active but still restoring right now.
EOSv1 is becoming messy. Any good ideas how we could simplify it?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1477,6 +1590,67 @@ public Object getStateLock() {
return stateLock;
}
+ public Map<String, KafkaFuture<Uuid>> consumerClientInstanceIds(final
Duration timeout) {
+ boolean setDeadline = false;
+
+ final Map<String, KafkaFuture<Uuid>> result = new HashMap<>();
+
+ KafkaFutureImpl<Uuid> future = new KafkaFutureImpl<>();
+ if (mainConsumerClientInstanceId != null) {
+ future.complete(mainConsumerClientInstanceId);
+ } else {
+ mainConsumerInstanceIdFuture = future;
+ setDeadline = true;
+ }
+ result.put(getName() + "-consumer", future);
+
+ future = new KafkaFutureImpl<>();
+ if (restoreConsumerClientInstanceId != null) {
+ future.complete(restoreConsumerClientInstanceId);
+ } else {
+ restoreConsumerInstanceIdFuture = future;
+ setDeadline = true;
+ }
+ result.put(getName() + "-restore-consumer", future);
+
+ if (setDeadline) {
+ fetchDeadline = time.milliseconds() + timeout.toMillis();
+ }
+
+ return result;
+ }
+
+ public KafkaFuture<Map<String, KafkaFuture<Uuid>>>
producersClientInstanceIds(final Duration timeout) {
+ final KafkaFutureImpl<Map<String, KafkaFuture<Uuid>>> result = new
KafkaFutureImpl<>();
Review Comment:
`result` future will complete only after the `Map` it returns is fully
populated. For EOSv1, we cannot do this sync, thus we need this indirection.
For ALOS and EOSv2, the map is create sync (it's just a singleton for the
thread producer).
Cf. other comment below.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -713,6 +735,97 @@ boolean runLoop() {
return true;
}
+ // visible for testing
+ void maybeGetClientInstanceIds() {
+ // we pass in a timeout of zero into each `clientInstanceId()` call
+ // to just trigger the "get instance id" background RPC;
+ // we don't want to block the stream thread that can do useful work in
the meantime
+
+ if (fetchDeadline != -1) {
+ if (!mainConsumerInstanceIdFuture.isDone()) {
+ if (fetchDeadline >= time.milliseconds()) {
+ try {
+ mainConsumerClientInstanceId =
mainConsumer.clientInstanceId(Duration.ZERO);
+
mainConsumerInstanceIdFuture.complete(mainConsumerClientInstanceId);
+ maybeResetFetchDeadline();
+ } catch (final IllegalStateException disabledError) {
+ mainConsumerInstanceIdFuture.complete(null);
Review Comment:
We might want to add logging, similar to admin case... (same for others
below) -- forgot to do this.
Was also wondering if KS code should log client instance id on successful
retrieval? Ask a similar question on another KIP-714 PR for clients.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1795,146 @@ protected int processStreamThread(final
Consumer<StreamThread> consumer) {
return copy.size();
}
+ /**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return The internal clients' assigned instance ids used for metrics
collection.
+ *
+ * @throws IllegalArgumentException If {@code timeout} is negative.
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ * @throws StreamsException For any other error that might occur.
+ */
+ public ClientInstanceIds clientInstanceIds(final Duration timeout) {
+ if (timeout.isNegative()) {
+ throw new IllegalArgumentException("The timeout cannot be
negative.");
+ }
+ if (state().hasNotStarted()) {
+ throw new IllegalStateException("KafkaStreams has not been
started, you can retry after calling start().");
+ }
+ if (state().isShuttingDown() || state.hasCompletedShutdown()) {
+ throw new IllegalStateException("KafkaStreams has been stopped ("
+ state + ").");
+ }
+
+ final ClientInstanceIdsImpl clientInstanceIds = new
ClientInstanceIdsImpl();
+
+ // (1) fan-out calls to threads
+
+ // StreamThread for main/restore consumers and producer(s)
+ final Map<String, KafkaFuture<Uuid>> consumerFutures = new HashMap<>();
+ final Map<String, KafkaFuture<Map<String, KafkaFuture<Uuid>>>>
producerFutures = new HashMap<>();
+ for (final StreamThread streamThread : threads) {
+
consumerFutures.putAll(streamThread.consumerClientInstanceIds(timeout));
+ producerFutures.put(streamThread.getName(),
streamThread.producersClientInstanceIds(timeout));
+ }
+ // GlobalThread
+ KafkaFuture<Uuid> globalThreadFuture = null;
+ if (globalStreamThread != null) {
+ globalThreadFuture =
globalStreamThread.globalConsumerInstanceId(timeout);
+ }
+
+ // (2) get admin client instance id in a blocking fashion, while
Stream/GlobalThreads work in parallel
+ try {
+
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
+ } catch (final IllegalStateException telemetryDisabledError) {
+ // swallow
+ log.debug("Telemetry is disabled on the admin client.");
+ } catch (final TimeoutException timeoutException) {
Review Comment:
Previous version did not follow what was discussed on the KIP. If any
instance id cannot be retrieved, we throw an error, so we re-throw. -- Other
errors are just wrapped as `StreamsException` to avoid checked exceptions (this
part was not discussed on the KIP; just discovered it now).
Similar changes below to work through the futures.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1791,6 +1795,146 @@ protected int processStreamThread(final
Consumer<StreamThread> consumer) {
return copy.size();
}
+ /**
+ * Returns the internal clients' assigned {@code client instance ids}.
+ *
+ * @return The internal clients' assigned instance ids used for metrics
collection.
+ *
+ * @throws IllegalArgumentException If {@code timeout} is negative.
+ * @throws IllegalStateException If {@code KafkaStreams} is not running.
+ * @throws TimeoutException Indicates that a request timed out.
+ * @throws StreamsException For any other error that might occur.
Review Comment:
This is also newly added. KIP needs an update.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]