Copilot commented on code in PR #22493:
URL: https://github.com/apache/kafka/pull/22493#discussion_r3374763096


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -5151,6 +5163,26 @@ void maybeRetry(long currentTimeMs, Throwable throwable) 
{
                     super.maybeRetry(currentTimeMs, throwable);
                 }
             }
+
+            @Override
+            boolean handleNodeUnavailable(long currentTimeMs) {
+                OptionalInt brokerId = spec.scope.destinationBrokerId();
+                if (brokerId.isPresent()
+                        && metadataManager.isReady()
+                        && metadataManager.nodeById(brokerId.getAsInt()) == 
null) {
+                    // The fulfillment target broker is no longer present in 
the cluster metadata.
+                    // This happens when a stale entry in the partition leader 
cache points at a
+                    // broker that has since left the cluster. Send the keys 
back to the lookup
+                    // stage so the leader can be re-resolved, rather than 
waiting for the request
+                    // deadline to expire without ever issuing another lookup.
+                    log.debug("Broker {} for {} is no longer in the cluster 
metadata; retrying lookup.",
+                        brokerId.getAsInt(), spec.name);
+                    driver.retryLookup(currentTimeMs, spec);
+                    maybeSendRequests(driver, currentTimeMs);
+                    return true;
+                }
+                return false;

Review Comment:
   `handleNodeUnavailable` unconditionally calls `driver.retryLookup(...)` for 
any driver-based request with a destination broker missing from metadata. For 
`StaticBrokerStrategy` (e.g., `describeProducers` when `options.brokerId` is 
set), retryLookup is a no-op (it remaps straight back to the same fulfillment 
broker), so this path will repeatedly recreate calls and increment the 
driver/call `tries` without ever sending a request. That can cause premature 
failure via `maxRetries` ("Exceeded maxRetries") rather than waiting for 
`default.api.timeout.ms` / deadline, which is a behavior regression for APIs 
that intentionally target a specific broker id.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java:
##########
@@ -188,6 +188,19 @@ private void retryLookup(Collection<K> keys) {
         keys.forEach(this::unmap);
     }
 
+    /**
+     * Send the keys of a fulfillment request back to the Lookup stage. This 
is invoked when a
+     * fulfillment request cannot be routed because its target broker is no 
longer present in the
+     * cluster metadata, for example when a stale entry in the partition 
leader cache pointed at a
+     * broker that has since left the cluster. Re-running the lookup gives us 
a chance to discover
+     * the current leader. Without this, such a request would remain 
unassignable until the request
+     * deadline expires.
+     */
+    public void retryLookup(long currentTimeMs, RequestSpec<K> spec) {
+        clearInflightRequest(currentTimeMs, spec);
+        retryLookup(spec.keys);
+    }

Review Comment:
   `retryLookup(long, RequestSpec)` always clears the inflight request and then 
calls `unmap` for each key, but `unmap` may immediately remap back into the 
fulfillment stage when the lookup strategy’s 
`lookupScope(key).destinationBrokerId()` is present (e.g., 
`StaticBrokerStrategy`). In that case, callers like 
`KafkaAdminClient.Call#handleNodeUnavailable` can end up in a loop that 
recreates fulfillment specs/calls and consumes retry counters without ever 
making progress. Consider making this method return whether it actually moved 
any keys into the lookup stage (destination broker id empty) so callers can 
decide whether to take corrective action or leave the call pending.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to