lucasbru commented on code in PR #14962:
URL: https://github.com/apache/kafka/pull/14962#discussion_r1422495787


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##########
@@ -55,16 +56,13 @@
  * <p>
  * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
  * prevent sending it without backing off from previous attempts.
- * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
- * The {@code inflightRequests} are memorized by topic name. If all topics are 
requested, then we use {@code Optional
- * .empty()} as the key.
- * Once a request is completed successfully, its corresponding entry is 
removed.
+ * Once a request is completed successfully or times out, its corresponding 
entry is removed.
  * </p>
  */
 
 public class TopicMetadataRequestManager implements RequestManager {
     private final boolean allowAutoTopicCreation;
-    private final Map<Optional<String>, TopicMetadataRequestState> 
inflightRequests;
+    private final List<TopicMetadataRequestState> inflightRequests;

Review Comment:
   Why did you decide to remove the reuse of requests



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##########
@@ -73,79 +71,127 @@ public class TopicMetadataRequestManager implements 
RequestManager {
     public TopicMetadataRequestManager(final LogContext context, final 
ConsumerConfig config) {
         logContext = context;
         log = logContext.logger(getClass());
-        inflightRequests = new HashMap<>();
+        inflightRequests = new LinkedList<>();
         retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
         allowAutoTopicCreation = 
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
     }
 
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-        List<NetworkClientDelegate.UnsentRequest> requests = 
inflightRequests.values().stream()
+        // Prune any requests which have timed out
+        List<TopicMetadataRequestState> expiredRequests = 
inflightRequests.stream()
+                .filter(req -> req.isExpired(currentTimeMs))
+                .collect(Collectors.toList());
+        expiredRequests.forEach(TopicMetadataRequestState::expire);
+
+        List<NetworkClientDelegate.UnsentRequest> requests = 
inflightRequests.stream()
             .map(req -> req.send(currentTimeMs))
             .filter(Optional::isPresent)
             .map(Optional::get)
             .collect(Collectors.toList());
+
         return requests.isEmpty() ? EMPTY : new 
NetworkClientDelegate.PollResult(0, requests);
     }
 
     /**
-     * return the future of the metadata request. Return the existing future 
if a request for the same topic is already
-     * inflight.
+     * Return the future of the metadata request.
      *
-     * @param topic to be requested. If empty, return the metadata for all 
topics.
      * @return the future of the metadata request.
      */
-    public CompletableFuture<Map<String, List<PartitionInfo>>> 
requestTopicMetadata(final Optional<String> topic) {
-        if (inflightRequests.containsKey(topic)) {
-            return inflightRequests.get(topic).future;
-        }
+    public CompletableFuture<Map<String, List<PartitionInfo>>> 
requestAllTopicsMetadata(final long expirationTimeMs) {
+        TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+                logContext,
+                expirationTimeMs,
+                retryBackoffMs,
+                retryBackoffMaxMs);
+        inflightRequests.add(newRequest);
+        return newRequest.future;
+    }
 
+    /**
+     * Return the future of the metadata request.
+     *
+     * @param topic to be requested.
+     * @return the future of the metadata request.
+     */
+    public CompletableFuture<Map<String, List<PartitionInfo>>> 
requestTopicMetadata(final String topic, final long expirationTimeMs) {
         TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
                 logContext,
                 topic,
+                expirationTimeMs,
                 retryBackoffMs,
                 retryBackoffMaxMs);
-        inflightRequests.put(topic, newRequest);
+        inflightRequests.add(newRequest);
         return newRequest.future;
     }
 
     // Visible for testing
     List<TopicMetadataRequestState> inflightRequests() {
-        return new ArrayList<>(inflightRequests.values());
+        return inflightRequests;
     }
 
     class TopicMetadataRequestState extends RequestState {
-        private final Optional<String> topic;
+        private final String topic;
+        private final boolean allTopics;
+        private final long expirationTimeMs;
         CompletableFuture<Map<String, List<PartitionInfo>>> future;
 
         public TopicMetadataRequestState(final LogContext logContext,
-                                         final Optional<String> topic,
+                                         final long expirationTimeMs,
+                                         final long retryBackoffMs,
+                                         final long retryBackoffMaxMs) {
+            super(logContext, TopicMetadataRequestState.class.getSimpleName(), 
retryBackoffMs,
+                    retryBackoffMaxMs);
+            future = new CompletableFuture<>();
+            this.topic = null;
+            this.allTopics = true;
+            this.expirationTimeMs = expirationTimeMs;
+        }
+
+        public TopicMetadataRequestState(final LogContext logContext,
+                                         final String topic,
+                                         final long expirationTimeMs,
                                          final long retryBackoffMs,
                                          final long retryBackoffMaxMs) {
             super(logContext, TopicMetadataRequestState.class.getSimpleName(), 
retryBackoffMs,
                 retryBackoffMaxMs);
             future = new CompletableFuture<>();
             this.topic = topic;
+            this.allTopics = false;
+            this.expirationTimeMs = expirationTimeMs;
         }
 
         /**
          * prepare the metadata request and return an
          * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
 if needed.
          */
         private Optional<NetworkClientDelegate.UnsentRequest> send(final long 
currentTimeMs) {
+            if (currentTimeMs >= expirationTimeMs) {

Review Comment:
   Do we need to do this? This is that thing again, the original code always 
sends the request, even if the timeout is expired, because the timeout only 
refers to the amount of time we are blocking. I.e. we should execute all 
operations that are non-blocking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to