[jira] [Commented] (KAFKA-6916) AdminClient does not refresh metadata on broker failure

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493716#comment-16493716
 ] 

ASF GitHub Bot commented on KAFKA-6916:
---

hachikuji closed pull request #5050: KAFKA-6916: Refresh metadata in admin 
client if broker connection fails
URL: https://github.com/apache/kafka/pull/5050
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c935ef9d5e2..681000a3274 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -971,6 +971,29 @@ private void handleResponses(long now, 
List responses) {
 }
 }
 
+/**
+ * Reassign calls that have not yet been sent. When metadata is 
refreshed,
+ * all unsent calls are reassigned to handle controller change and 
node changes.
+ * When a node is disconnected, all calls assigned to the node are 
reassigned.
+ *
+ * @param now The current time in milliseconds
+ * @param disconnectedOnly Reassign only calls to nodes that were 
disconnected
+ * in the last poll
+ */
+private void reassignUnsentCalls(long now, boolean disconnectedOnly) {
+ArrayList pendingCallsToSend = new ArrayList<>();
+for (Iterator>> iter = 
callsToSend.entrySet().iterator(); iter.hasNext(); ) {
+Map.Entry> entry = iter.next();
+if (!disconnectedOnly || 
client.connectionFailed(entry.getKey())) {
+for (Call call : entry.getValue()) {
+pendingCallsToSend.add(call);
+}
+iter.remove();
+}
+}
+chooseNodesForPendingCalls(now, pendingCallsToSend.iterator());
+}
+
 private boolean hasActiveExternalCalls(Collection calls) {
 for (Call call : calls) {
 if (!call.isInternal()) {
@@ -1055,6 +1078,7 @@ public void run() {
 
 // Update the current time and handle the latest responses.
 now = time.milliseconds();
+reassignUnsentCalls(now, true); // reassign calls to 
disconnected nodes
 handleResponses(now, responses);
 }
 int numTimedOut = 0;
@@ -1138,7 +1162,9 @@ private Call makeMetadataCall(long now) {
 @Override
 public void handleResponse(AbstractResponse abstractResponse) {
 MetadataResponse response = (MetadataResponse) 
abstractResponse;
-metadataManager.update(response.cluster(), 
time.milliseconds());
+long now = time.milliseconds();
+metadataManager.update(response.cluster(), now);
+reassignUnsentCalls(now, false);
 }
 
 @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index e06aed202d7..85d3c28e8df 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -116,7 +116,7 @@ public void handleCompletedMetadataResponse(RequestHeader 
requestHeader, long no
 
 @Override
 public void requestUpdate() {
-// Do nothing
+AdminMetadataManager.this.requestUpdate();
 }
 }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 24daa8608e3..231b1e7ab26 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -219,6 +219,21 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
 waitForTopics(client, List(), topics)
   }
 
+  @Test
+  def testMetadataRefresh(): Unit = {
+client = AdminClient.create(createConfig())
+val topics = Seq("mytopic")
+val newTopics = Seq(new NewTopic("mytopic", 3, 3))
+client.createTopics(newTopics.asJava).all.get()
+waitForTopics(client, expectedPresent = topics, expectedMissing = List())
+
+val controller = servers.find(_.config.brokerId == 

[jira] [Commented] (KAFKA-6916) AdminClient does not refresh metadata on broker failure

2018-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16482281#comment-16482281
 ] 

ASF GitHub Bot commented on KAFKA-6916:
---

rajinisivaram opened a new pull request #5050: KAFKA-6916: Refresh metadata in 
admin client if broker connection fails
URL: https://github.com/apache/kafka/pull/5050
 
 
   Refresh metadata if broker connection fails so that new calls are sent only 
to nodes that are alive and requests to controller are sent to the new 
controller if controller changes due to broker failure. Also reassign calls 
that could not be sent.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AdminClient does not refresh metadata on broker failure
> ---
>
> Key: KAFKA-6916
> URL: https://issues.apache.org/jira/browse/KAFKA-6916
> Project: Kafka
>  Issue Type: Task
>  Components: admin
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.0.0
>
>
> There are intermittent test failures in DynamicBrokerReconfigurationTest when 
> brokers are restarted. The test uses ephemeral ports and hence ports after 
> server restart are not the same as the ports before restart. The tests rely 
> on metadata refresh on producers, consumers and admin clients to obtain new 
> server ports when connections fail. This works with producers and consumers, 
> but results in intermittent failures with admin client because refresh is not 
> triggered.
> There are a couple of issues in AdminClient:
>  # Unlike producers and consumers, adminClient does not request metadata 
> update when connection to a broker fails. This is particularly bad if 
> controller goes down. Controller is used for various requests like 
> createTopics and describeTopics. If controller goes down and 
> adminClient.describeTopics() is invoked, adminClient sends the request to the 
> old controller. If the connection fails, it keeps retrying with the same 
> address. Metadata refresh is never triggered. The request times out after 2 
> minutes by default, metadata is not refreshed for 5 minutes by default. We 
> should refresh metadata whenever connection to a broker fails.
>  # Admin client requests are always retried on the same node. In the example 
> above, if controller goes down and a new controller is elected, it will be 
> good if the retried request is sent to the new controller. Otherwise we are 
> just blocking the call for 2 minutes with a lot of retries that would never 
> succeed.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)