lucasbru commented on code in PR #22493:
URL: https://github.com/apache/kafka/pull/22493#discussion_r3387723156
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -5151,6 +5163,27 @@ void maybeRetry(long currentTimeMs, Throwable throwable)
{
super.maybeRetry(currentTimeMs, throwable);
}
}
+
+ @Override
+ boolean handleNodeUnavailable(long currentTimeMs) {
+ OptionalInt brokerId = spec.scope.destinationBrokerId();
+ // The fulfillment target broker is no longer present in the
cluster metadata. This
+ // happens when a stale entry in the partition leader cache
points at a broker that
+ // has since left the cluster. Send the keys back to the
lookup stage so the leader
+ // can be re-resolved, rather than waiting for the request
deadline to expire without
+ // ever issuing another lookup. maybeRetryLookup is a no-op
(returns false) for
+ // strategies that target a fixed broker id, in which case we
leave the call pending.
+ if (brokerId.isPresent()
+ && metadataManager.isReady()
+ && metadataManager.nodeById(brokerId.getAsInt()) ==
null
+ && driver.maybeRetryLookup(currentTimeMs, spec)) {
+ log.debug("Broker {} for {} is no longer in the cluster
metadata; retrying lookup.",
+ brokerId.getAsInt(), spec.name);
+ maybeSendRequests(driver, currentTimeMs);
Review Comment:
https://github.com/apache/kafka/pull/22529/changes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]