[ 
https://issues.apache.org/jira/browse/KAFKA-6546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518324#comment-16518324
 ] 

ASF GitHub Bot commented on KAFKA-6546:
---------------------------------------

rajinisivaram closed pull request #5189: KAFKA-6546: Use 
LISTENER_NOT_FOUND_ON_LEADER error for missing listener
URL: https://github.com/apache/kafka/pull/5189
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 7c87277eb8f..68f529dcb1c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Sensor;
@@ -52,6 +53,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.stream.Collectors;
 
 /**
  * A network client for asynchronous request/response network i/o. This is an 
internal class used to implement the
@@ -915,6 +917,20 @@ public void 
handleAuthenticationFailure(AuthenticationException exception) {
         public void handleCompletedMetadataResponse(RequestHeader 
requestHeader, long now, MetadataResponse response) {
             this.metadataFetchInProgress = false;
             Cluster cluster = response.cluster();
+
+            // If any partition has leader with missing listeners, log a few 
for diagnosing broker configuration
+            // issues. This could be a transient issue if listeners were added 
dynamically to brokers.
+            List<TopicPartition> missingListenerPartitions = 
response.topicMetadata().stream().flatMap(topicMetadata ->
+                topicMetadata.partitionMetadata().stream()
+                    .filter(partitionMetadata -> partitionMetadata.error() == 
Errors.LISTENER_NOT_FOUND)
+                    .map(partitionMetadata -> new 
TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
+                .collect(Collectors.toList());
+            if (!missingListenerPartitions.isEmpty()) {
+                int count = missingListenerPartitions.size();
+                log.warn("{} partitions have leader brokers without a matching 
listener, including {}",
+                        count, missingListenerPartitions.subList(0, 
Math.min(10, count)));
+            }
+
             // check if any topics metadata failed to get updated
             Map<String, Errors> errors = response.errors();
             if (!errors.isEmpty())
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java
new file mode 100644
index 00000000000..82c5d892f49
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * The leader does not have an endpoint corresponding to the listener on which 
metadata was requested.
+ * This could indicate a broker configuration error or a transient error when 
listeners are updated
+ * dynamically and client requests are processed before all brokers have 
updated their listeners.
+ * This is currently used only for missing listeners on leader brokers, but 
may be used for followers
+ * in future.
+ */
+public class ListenerNotFoundException extends InvalidMetadataException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ListenerNotFoundException(String message) {
+        super(message);
+    }
+
+    public ListenerNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 5db1d314be3..9c522dff935 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -30,6 +30,7 @@
 import org.apache.kafka.common.errors.DelegationTokenExpiredException;
 import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
 import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
+import org.apache.kafka.common.errors.ListenerNotFoundException;
 import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
@@ -624,7 +625,14 @@ public ApiException build(String message) {
             public ApiException build(String message) {
                 return new InvalidFetchSessionEpochException(message);
             }
-    });
+    }),
+    LISTENER_NOT_FOUND(72, "There is no listener on the leader broker that 
matches the listener on which metadata request was processed",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ListenerNotFoundException(message);
+            }
+    }),;
 
     private interface ApiExceptionBuilder {
         ApiException build(String message);
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7a39c1279e0..33f2f088163 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -989,8 +989,10 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: 
Set[String], listenerName: ListenerName,
-                               errorUnavailableEndpoints: Boolean): 
Seq[MetadataResponse.TopicMetadata] = {
-    val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, 
errorUnavailableEndpoints)
+                               errorUnavailableEndpoints: Boolean,
+                               errorUnavailableListeners: Boolean): 
Seq[MetadataResponse.TopicMetadata] = {
+    val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
+        errorUnavailableEndpoints, errorUnavailableListeners)
     if (topics.isEmpty || topicResponses.size == topics.size) {
       topicResponses
     } else {
@@ -1066,12 +1068,15 @@ class KafkaApis(val requestChannel: RequestChannel,
     // In version 0, we returned an error when brokers with replicas were 
unavailable,
     // while in higher versions we simply don't include the broker in the 
returned broker list
     val errorUnavailableEndpoints = requestVersion == 0
+    // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching 
listener was not found on the leader.
+    // From version 6 onwards, we return LISTENER_NOT_FOUND to enable 
diagnosis of configuration errors.
+    val errorUnavailableListeners = requestVersion >= 6
     val topicMetadata =
       if (authorizedTopics.isEmpty)
         Seq.empty[MetadataResponse.TopicMetadata]
       else
         getTopicMetadata(metadataRequest.allowAutoTopicCreation, 
authorizedTopics, request.context.listenerName,
-          errorUnavailableEndpoints)
+          errorUnavailableEndpoints, errorUnavailableListeners)
 
     val completeTopicMetadata = topicMetadata ++ 
unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
 
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 43fe35287d3..b0603b87686 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -65,19 +65,29 @@ class MetadataCache(brokerId: Int) extends Logging {
   }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  private def getPartitionMetadata(topic: String, listenerName: ListenerName, 
errorUnavailableEndpoints: Boolean): 
Option[Iterable[MetadataResponse.PartitionMetadata]] = {
+  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener 
is missing on the broker.
+  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing 
listener (Metadata response v5 and below).
+  private def getPartitionMetadata(topic: String, listenerName: ListenerName, 
errorUnavailableEndpoints: Boolean,
+                                   errorUnavailableListeners: Boolean): 
Option[Iterable[MetadataResponse.PartitionMetadata]] = {
     cache.get(topic).map { partitions =>
       partitions.map { case (partitionId, partitionState) =>
         val topicPartition = TopicAndPartition(topic, partitionId)
-        val maybeLeader = 
getAliveEndpoint(partitionState.basePartitionState.leader, listenerName)
+        val leaderBrokerId = partitionState.basePartitionState.leader
+        val maybeLeader = getAliveEndpoint(leaderBrokerId, listenerName)
         val replicas = 
partitionState.basePartitionState.replicas.asScala.map(_.toInt)
         val replicaInfo = getEndpoints(replicas, listenerName, 
errorUnavailableEndpoints)
         val offlineReplicaInfo = 
getEndpoints(partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, 
errorUnavailableEndpoints)
 
         maybeLeader match {
           case None =>
-            debug(s"Error while fetching metadata for $topicPartition: leader 
not available")
-            new 
MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, 
Node.noNode(),
+            val error = if (!aliveBrokers.contains(brokerId)) { // we are 
already holding the read lock
+              debug(s"Error while fetching metadata for $topicPartition: 
leader not available")
+              Errors.LEADER_NOT_AVAILABLE
+            } else {
+              debug(s"Error while fetching metadata for $topicPartition: 
listener $listenerName not found on leader $leaderBrokerId")
+              if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else 
Errors.LEADER_NOT_AVAILABLE
+            }
+            new MetadataResponse.PartitionMetadata(error, partitionId, 
Node.noNode(),
               replicaInfo.asJava, java.util.Collections.emptyList(), 
offlineReplicaInfo.asJava)
 
           case Some(leader) =>
@@ -112,10 +122,11 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  def getTopicMetadata(topics: Set[String], listenerName: ListenerName, 
errorUnavailableEndpoints: Boolean = false): 
Seq[MetadataResponse.TopicMetadata] = {
+  def getTopicMetadata(topics: Set[String], listenerName: ListenerName, 
errorUnavailableEndpoints: Boolean = false,
+                       errorUnavailableListeners: Boolean = false): 
Seq[MetadataResponse.TopicMetadata] = {
     inReadLock(partitionMetadataLock) {
       topics.toSeq.flatMap { topic =>
-        getPartitionMetadata(topic, listenerName, 
errorUnavailableEndpoints).map { partitionMetadata =>
+        getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners).map { partitionMetadata =>
           new MetadataResponse.TopicMetadata(Errors.NONE, topic, 
Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
         }
       }
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 45b3fdc74bd..5f7c67cdc6a 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -824,10 +824,23 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     verifyAddListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, 
Seq("GSSAPI"))
     //verifyRemoveListener("SASL_SSL", SecurityProtocol.SASL_SSL, 
Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN"))
     verifyRemoveListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, 
Seq("GSSAPI"))
+
+    // Verify that a listener added to a subset of servers doesn't cause any 
issues
+    // when metadata is processed by the client.
+    addListener(servers.tail, "SCRAM_LISTENER", 
SecurityProtocol.SASL_PLAINTEXT, Seq("SCRAM-SHA-256"))
+    val bootstrap = TestUtils.bootstrapServers(servers.tail, new 
ListenerName("SCRAM_LISTENER"))
+    val producer = ProducerBuilder().bootstrapServers(bootstrap)
+      .securityProtocol(SecurityProtocol.SASL_PLAINTEXT)
+      .saslMechanism("SCRAM-SHA-256")
+      .maxRetries(1000)
+      .build()
+    val partitions = producer.partitionsFor(topic).asScala
+    assertEquals(0, partitions.count(p => p.leader != null && p.leader.id == 
servers.head.config.brokerId))
+    assertTrue("Did not find partitions with no leader", 
partitions.exists(_.leader == null))
   }
 
-  private def verifyAddListener(listenerName: String, securityProtocol: 
SecurityProtocol,
-                                saslMechanisms: Seq[String]): Unit = {
+  private def addListener(servers: Seq[KafkaServer], listenerName: String, 
securityProtocol: SecurityProtocol,
+                          saslMechanisms: Seq[String]): Unit = {
     val config = servers.head.config
     val existingListenerCount = config.listeners.size
     val listeners = config.listeners
@@ -867,14 +880,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
       }
     }), "Listener not created")
 
-    if (saslMechanisms.nonEmpty)
-      saslMechanisms.foreach { mechanism =>
-        verifyListener(securityProtocol, Some(mechanism), 
s"add-listener-group-$securityProtocol-$mechanism")
-      }
-    else
-      verifyListener(securityProtocol, None, 
s"add-listener-group-$securityProtocol")
-
-    val brokerConfigs = describeConfig(adminClients.head).entries.asScala
+    val brokerConfigs = describeConfig(adminClients.head, 
servers).entries.asScala
     props.asScala.foreach { case (name, value) =>
       val entry = brokerConfigs.find(_.name == name).getOrElse(throw new 
IllegalArgumentException(s"Config not found $name"))
       if (DynamicBrokerConfig.isPasswordConfig(name) || name == unknownConfig)
@@ -884,6 +890,17 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     }
   }
 
+  private def verifyAddListener(listenerName: String, securityProtocol: 
SecurityProtocol,
+                                saslMechanisms: Seq[String]): Unit = {
+    addListener(servers, listenerName, securityProtocol, saslMechanisms)
+    if (saslMechanisms.nonEmpty)
+      saslMechanisms.foreach { mechanism =>
+        verifyListener(securityProtocol, Some(mechanism), 
s"add-listener-group-$securityProtocol-$mechanism")
+      }
+    else
+      verifyListener(securityProtocol, None, 
s"add-listener-group-$securityProtocol")
+  }
+
   private def verifyRemoveListener(listenerName: String, securityProtocol: 
SecurityProtocol,
                                    saslMechanisms: Seq[String]): Unit = {
     val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head
@@ -1013,7 +1030,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     }
   }
 
-  private def describeConfig(adminClient: AdminClient): Config = {
+  private def describeConfig(adminClient: AdminClient, servers: 
Seq[KafkaServer] = this.servers): Config = {
     val configResources = servers.map { server =>
       new ConfigResource(ConfigResource.Type.BROKER, 
server.config.brokerId.toString)
     }
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 0ee73659780..82c14ee5827 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -110,7 +110,47 @@ class MetadataCacheTest {
   }
 
   @Test
-  def getTopicMetadataPartitionLeaderNotAvailable() {
+  def getTopicMetadataPartitionLeaderNotAvailable(): Unit = {
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+    val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, 
securityProtocol, listenerName)).asJava, null))
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, 
listenerName,
+      leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = 
false)
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, 
listenerName,
+      leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = 
true)
+  }
+
+  @Test
+  def getTopicMetadataPartitionListenerNotAvailableOnLeader(): Unit = {
+    val plaintextListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    val sslListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
+    val broker0Endpoints = Seq(
+      new EndPoint("host0", 9092, SecurityProtocol.PLAINTEXT, 
plaintextListenerName),
+      new EndPoint("host0", 9093, SecurityProtocol.SSL, sslListenerName))
+    val broker1Endpoints = Seq(new EndPoint("host1", 9092, 
SecurityProtocol.PLAINTEXT, plaintextListenerName))
+    val brokers = Set(new Broker(0, broker0Endpoints.asJava, null), new 
Broker(1, broker1Endpoints.asJava, null))
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, 
sslListenerName,
+      leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true)
+  }
+
+  @Test
+  def 
getTopicMetadataPartitionListenerNotAvailableOnLeaderOldMetadataVersion(): Unit 
= {
+    val plaintextListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    val sslListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
+    val broker0Endpoints = Seq(
+      new EndPoint("host0", 9092, SecurityProtocol.PLAINTEXT, 
plaintextListenerName),
+      new EndPoint("host0", 9093, SecurityProtocol.SSL, sslListenerName))
+    val broker1Endpoints = Seq(new EndPoint("host1", 9092, 
SecurityProtocol.PLAINTEXT, plaintextListenerName))
+    val brokers = Set(new Broker(0, broker0Endpoints.asJava, null), new 
Broker(1, broker1Endpoints.asJava, null))
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, 
sslListenerName,
+      leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = 
false)
+  }
+
+  private def 
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers: Set[Broker],
+                                                                       
listenerName: ListenerName,
+                                                                       leader: 
Int,
+                                                                       
expectedError: Errors,
+                                                                       
errorUnavailableListeners: Boolean): Unit = {
     val topic = "topic"
 
     val cache = new MetadataCache(1)
@@ -118,11 +158,7 @@ class MetadataCacheTest {
     val zkVersion = 3
     val controllerId = 2
     val controllerEpoch = 1
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-    val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, 
securityProtocol, listenerName)).asJava, null))
 
-    val leader = 1
     val leaderEpoch = 1
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new 
UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, 
asList(0), zkVersion, asList(0), asList()))
@@ -132,7 +168,7 @@ class MetadataCacheTest {
       partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
-    val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName)
+    val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, 
errorUnavailableListeners = errorUnavailableListeners)
     assertEquals(1, topicMetadatas.size)
 
     val topicMetadata = topicMetadatas.head
@@ -143,7 +179,7 @@ class MetadataCacheTest {
 
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
-    assertEquals(Errors.LEADER_NOT_AVAILABLE, partitionMetadata.error)
+    assertEquals(expectedError, partitionMetadata.error)
     assertTrue(partitionMetadata.isr.isEmpty)
     assertEquals(1, partitionMetadata.replicas.size)
     assertEquals(0, partitionMetadata.replicas.get(0).id)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener
> ----------------------------------------------------------------
>
>                 Key: KAFKA-6546
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6546
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 2.1.0
>
>
> In 1,1, if an endpoint is available on the broker processing a metadata 
> request, but the corresponding listener is not available on the leader of a 
> partition, LEADER_NOT_AVAILABLE is returned (earlier versions returned 
> UNKNOWN_SERVER_ERROR). This could indicate broker misconfiguration where some 
> brokers are not configured with all listeners or it could indicate a 
> transient error when listeners are dynamically added, We want to treat the 
> error as a transient error to process dynamic updates, but we should notify 
> clients of the actual error. This change should be made when MetadataRequest 
> version is updated so that LEADER_NOT_AVAILABLE is returned to older clients.
> SeeĀ 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration]
>  andĀ  [https://github.com/apache/kafka/pull/4539] for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to