[
https://issues.apache.org/jira/browse/KAFKA-6916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493716#comment-16493716
]
ASF GitHub Bot commented on KAFKA-6916:
---
hachikuji closed pull request #5050: KAFKA-6916: Refresh metadata in admin
client if broker connection fails
URL: https://github.com/apache/kafka/pull/5050
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c935ef9d5e2..681000a3274 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -971,6 +971,29 @@ private void handleResponses(long now,
List responses) {
}
}
+/**
+ * Reassign calls that have not yet been sent. When metadata is
refreshed,
+ * all unsent calls are reassigned to handle controller change and
node changes.
+ * When a node is disconnected, all calls assigned to the node are
reassigned.
+ *
+ * @param now The current time in milliseconds
+ * @param disconnectedOnly Reassign only calls to nodes that were
disconnected
+ * in the last poll
+ */
+private void reassignUnsentCalls(long now, boolean disconnectedOnly) {
+ArrayList pendingCallsToSend = new ArrayList<>();
+for (Iterator>> iter =
callsToSend.entrySet().iterator(); iter.hasNext(); ) {
+Map.Entry> entry = iter.next();
+if (!disconnectedOnly ||
client.connectionFailed(entry.getKey())) {
+for (Call call : entry.getValue()) {
+pendingCallsToSend.add(call);
+}
+iter.remove();
+}
+}
+chooseNodesForPendingCalls(now, pendingCallsToSend.iterator());
+}
+
private boolean hasActiveExternalCalls(Collection calls) {
for (Call call : calls) {
if (!call.isInternal()) {
@@ -1055,6 +1078,7 @@ public void run() {
// Update the current time and handle the latest responses.
now = time.milliseconds();
+reassignUnsentCalls(now, true); // reassign calls to
disconnected nodes
handleResponses(now, responses);
}
int numTimedOut = 0;
@@ -1138,7 +1162,9 @@ private Call makeMetadataCall(long now) {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse)
abstractResponse;
-metadataManager.update(response.cluster(),
time.milliseconds());
+long now = time.milliseconds();
+metadataManager.update(response.cluster(), now);
+reassignUnsentCalls(now, false);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index e06aed202d7..85d3c28e8df 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -116,7 +116,7 @@ public void handleCompletedMetadataResponse(RequestHeader
requestHeader, long no
@Override
public void requestUpdate() {
-// Do nothing
+AdminMetadataManager.this.requestUpdate();
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 24daa8608e3..231b1e7ab26 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -219,6 +219,21 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
waitForTopics(client, List(), topics)
}
+ @Test
+ def testMetadataRefresh(): Unit = {
+client = AdminClient.create(createConfig())
+val topics = Seq("mytopic")
+val newTopics = Seq(new NewTopic("mytopic", 3, 3))
+client.createTopics(newTopics.asJava).all.get()
+waitForTopics(client, expectedPresent = topics, expectedMissing = List())
+
+val controller = servers.find(_.config.brokerId ==