chia7712 commented on code in PR #17367:
URL: https://github.com/apache/kafka/pull/17367#discussion_r1835288947


##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategyIntegrationTest.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;

Review Comment:
   this method is removed already, so please use `Set.of` instead



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -407,6 +408,7 @@ public class KafkaAdminClient extends AdminClient {
     private final ExponentialBackoff retryBackoff;
     private final boolean clientTelemetryEnabled;
     private final MetadataRecoveryStrategy metadataRecoveryStrategy;
+    private final Map<TopicPartition, Integer> partitionLeaderCache;

Review Comment:
   Do we need to manage the memory usage of this cache if it's being used to 
optimize long-running admin operations? Maybe we could set a size limit, or 
clean it up when the metadata expires (reusing the existing 
`metadata.max.age.ms` configuration)?



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java:
##########
@@ -195,4 +199,92 @@ public LookupResult<TopicPartition> handleResponse(
         return new LookupResult<>(failed, mapped);
     }
 
+    /**
+     * This subclass of {@link AdminApiFuture} starts with a pre-fetched map 
for keys to broker ids which can be
+     * used to optimise the request. The map is kept up to date as metadata is 
fetching as this request is processed.
+     * This is useful for situations in which {@link PartitionLeaderStrategy} 
is used
+     * repeatedly, such as a sequence of identical calls to
+     * {@link org.apache.kafka.clients.admin.Admin#listOffsets(Map, 
org.apache.kafka.clients.admin.ListOffsetsOptions)}.
+     */
+    public static class PartitionLeaderFuture<V> implements 
AdminApiFuture<TopicPartition, V> {
+        private final Set<TopicPartition> requestKeys;
+        private final Map<TopicPartition, Integer> partitionLeaderCache;
+        private final Map<TopicPartition, KafkaFuture<V>> futures;
+
+        public PartitionLeaderFuture(Set<TopicPartition> requestKeys, 
Map<TopicPartition, Integer> partitionLeaderCache) {
+            this.requestKeys = requestKeys;
+            this.partitionLeaderCache = partitionLeaderCache;
+            this.futures = requestKeys.stream().collect(Collectors.toMap(
+                Function.identity(),
+                k -> new KafkaFutureImpl<>()
+            ));
+        }
+
+        @Override
+        public Set<TopicPartition> lookupKeys() {
+            return futures.keySet();
+        }
+
+        @Override
+        public Set<TopicPartition> uncachedLookupKeys() {
+            Set<TopicPartition> keys = new HashSet<>();
+            requestKeys.forEach(tp -> {

Review Comment:
   Could you please replace `requestKeys.forEach` by `futures.keySet().forEach`?



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java:
##########
@@ -110,7 +110,15 @@ public AdminApiDriver(
             retryBackoffMaxMs,
             CommonClientConfigs.RETRY_BACKOFF_JITTER);
         this.log = logContext.logger(AdminApiDriver.class);
-        retryLookup(future.lookupKeys());
+
+        // For any lookup keys for which we do not have cached information, we 
will need to look up
+        // metadata. For all cached keys, they can proceed straight to the 
fulfillment map.
+        // Note that the cache is only used on the initial calls, and any 
errors that result
+        // in additional lookups use the full set of lookup keys.
+        retryLookup(future.uncachedLookupKeys());
+        future.cachedKeyBrokerIdMapping().forEach((key, brokerId) -> {

Review Comment:
   `{` is unnecessary



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java:
##########
@@ -195,4 +199,92 @@ public LookupResult<TopicPartition> handleResponse(
         return new LookupResult<>(failed, mapped);
     }
 
+    /**
+     * This subclass of {@link AdminApiFuture} starts with a pre-fetched map 
for keys to broker ids which can be
+     * used to optimise the request. The map is kept up to date as metadata is 
fetching as this request is processed.
+     * This is useful for situations in which {@link PartitionLeaderStrategy} 
is used
+     * repeatedly, such as a sequence of identical calls to
+     * {@link org.apache.kafka.clients.admin.Admin#listOffsets(Map, 
org.apache.kafka.clients.admin.ListOffsetsOptions)}.
+     */
+    public static class PartitionLeaderFuture<V> implements 
AdminApiFuture<TopicPartition, V> {
+        private final Set<TopicPartition> requestKeys;
+        private final Map<TopicPartition, Integer> partitionLeaderCache;
+        private final Map<TopicPartition, KafkaFuture<V>> futures;
+
+        public PartitionLeaderFuture(Set<TopicPartition> requestKeys, 
Map<TopicPartition, Integer> partitionLeaderCache) {
+            this.requestKeys = requestKeys;
+            this.partitionLeaderCache = partitionLeaderCache;
+            this.futures = requestKeys.stream().collect(Collectors.toMap(

Review Comment:
   Maybe we can use `toUnmodifiableMap` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to