Copilot commented on code in PR #22493:
URL: https://github.com/apache/kafka/pull/22493#discussion_r3374763096
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -5151,6 +5163,26 @@ void maybeRetry(long currentTimeMs, Throwable throwable)
{
super.maybeRetry(currentTimeMs, throwable);
}
}
+
+ @Override
+ boolean handleNodeUnavailable(long currentTimeMs) {
+ OptionalInt brokerId = spec.scope.destinationBrokerId();
+ if (brokerId.isPresent()
+ && metadataManager.isReady()
+ && metadataManager.nodeById(brokerId.getAsInt()) ==
null) {
+ // The fulfillment target broker is no longer present in
the cluster metadata.
+ // This happens when a stale entry in the partition leader
cache points at a
+ // broker that has since left the cluster. Send the keys
back to the lookup
+ // stage so the leader can be re-resolved, rather than
waiting for the request
+ // deadline to expire without ever issuing another lookup.
+ log.debug("Broker {} for {} is no longer in the cluster
metadata; retrying lookup.",
+ brokerId.getAsInt(), spec.name);
+ driver.retryLookup(currentTimeMs, spec);
+ maybeSendRequests(driver, currentTimeMs);
+ return true;
+ }
+ return false;
Review Comment:
`handleNodeUnavailable` unconditionally calls `driver.retryLookup(...)` for
any driver-based request with a destination broker missing from metadata. For
`StaticBrokerStrategy` (e.g., `describeProducers` when `options.brokerId` is
set), retryLookup is a no-op (it remaps straight back to the same fulfillment
broker), so this path will repeatedly recreate calls and increment the
driver/call `tries` without ever sending a request. That can cause premature
failure via `maxRetries` ("Exceeded maxRetries") rather than waiting for
`default.api.timeout.ms` / deadline, which is a behavior regression for APIs
that intentionally target a specific broker id.
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java:
##########
@@ -188,6 +188,19 @@ private void retryLookup(Collection<K> keys) {
keys.forEach(this::unmap);
}
+ /**
+ * Send the keys of a fulfillment request back to the Lookup stage. This
is invoked when a
+ * fulfillment request cannot be routed because its target broker is no
longer present in the
+ * cluster metadata, for example when a stale entry in the partition
leader cache pointed at a
+ * broker that has since left the cluster. Re-running the lookup gives us
a chance to discover
+ * the current leader. Without this, such a request would remain
unassignable until the request
+ * deadline expires.
+ */
+ public void retryLookup(long currentTimeMs, RequestSpec<K> spec) {
+ clearInflightRequest(currentTimeMs, spec);
+ retryLookup(spec.keys);
+ }
Review Comment:
`retryLookup(long, RequestSpec)` always clears the inflight request and then
calls `unmap` for each key, but `unmap` may immediately remap back into the
fulfillment stage when the lookup strategy’s
`lookupScope(key).destinationBrokerId()` is present (e.g.,
`StaticBrokerStrategy`). In that case, callers like
`KafkaAdminClient.Call#handleNodeUnavailable` can end up in a loop that
recreates fulfillment specs/calls and consumes retry counters without ever
making progress. Consider making this method return whether it actually moved
any keys into the lookup stage (destination broker id empty) so callers can
decide whether to take corrective action or leave the call pending.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]