[ https://issues.apache.org/jira/browse/KAFKA-6546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518324#comment-16518324 ]
ASF GitHub Bot commented on KAFKA-6546: --------------------------------------- rajinisivaram closed pull request #5189: KAFKA-6546: Use LISTENER_NOT_FOUND_ON_LEADER error for missing listener URL: https://github.com/apache/kafka/pull/5189 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 7c87277eb8f..68f529dcb1c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.metrics.Sensor; @@ -52,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.stream.Collectors; /** * A network client for asynchronous request/response network i/o. This is an internal class used to implement the @@ -915,6 +917,20 @@ public void handleAuthenticationFailure(AuthenticationException exception) { public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) { this.metadataFetchInProgress = false; Cluster cluster = response.cluster(); + + // If any partition has leader with missing listeners, log a few for diagnosing broker configuration + // issues. This could be a transient issue if listeners were added dynamically to brokers. + List<TopicPartition> missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata -> + topicMetadata.partitionMetadata().stream() + .filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND) + .map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition()))) + .collect(Collectors.toList()); + if (!missingListenerPartitions.isEmpty()) { + int count = missingListenerPartitions.size(); + log.warn("{} partitions have leader brokers without a matching listener, including {}", + count, missingListenerPartitions.subList(0, Math.min(10, count))); + } + // check if any topics metadata failed to get updated Map<String, Errors> errors = response.errors(); if (!errors.isEmpty()) diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java b/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java new file mode 100644 index 00000000000..82c5d892f49 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * The leader does not have an endpoint corresponding to the listener on which metadata was requested. + * This could indicate a broker configuration error or a transient error when listeners are updated + * dynamically and client requests are processed before all brokers have updated their listeners. + * This is currently used only for missing listeners on leader brokers, but may be used for followers + * in future. + */ +public class ListenerNotFoundException extends InvalidMetadataException { + + private static final long serialVersionUID = 1L; + + public ListenerNotFoundException(String message) { + super(message); + } + + public ListenerNotFoundException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 5db1d314be3..9c522dff935 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.DelegationTokenExpiredException; import org.apache.kafka.common.errors.DelegationTokenNotFoundException; import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException; +import org.apache.kafka.common.errors.ListenerNotFoundException; import org.apache.kafka.common.errors.FetchSessionIdNotFoundException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.GroupIdNotFoundException; @@ -624,7 +625,14 @@ public ApiException build(String message) { public ApiException build(String message) { return new InvalidFetchSessionEpochException(message); } - }); + }), + LISTENER_NOT_FOUND(72, "There is no listener on the leader broker that matches the listener on which metadata request was processed", + new ApiExceptionBuilder() { + @Override + public ApiException build(String message) { + return new ListenerNotFoundException(message); + } + }),; private interface ApiExceptionBuilder { ApiException build(String message); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7a39c1279e0..33f2f088163 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -989,8 +989,10 @@ class KafkaApis(val requestChannel: RequestChannel, } private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName, - errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = { - val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints) + errorUnavailableEndpoints: Boolean, + errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata] = { + val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, + errorUnavailableEndpoints, errorUnavailableListeners) if (topics.isEmpty || topicResponses.size == topics.size) { topicResponses } else { @@ -1066,12 +1068,15 @@ class KafkaApis(val requestChannel: RequestChannel, // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list val errorUnavailableEndpoints = requestVersion == 0 + // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader. + // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors. + val errorUnavailableListeners = requestVersion >= 6 val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[MetadataResponse.TopicMetadata] else getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName, - errorUnavailableEndpoints) + errorUnavailableEndpoints, errorUnavailableListeners) val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 43fe35287d3..b0603b87686 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -65,19 +65,29 @@ class MetadataCache(brokerId: Int) extends Logging { } // errorUnavailableEndpoints exists to support v0 MetadataResponses - private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = { + // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker. + // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below). + private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean, + errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = { cache.get(topic).map { partitions => partitions.map { case (partitionId, partitionState) => val topicPartition = TopicAndPartition(topic, partitionId) - val maybeLeader = getAliveEndpoint(partitionState.basePartitionState.leader, listenerName) + val leaderBrokerId = partitionState.basePartitionState.leader + val maybeLeader = getAliveEndpoint(leaderBrokerId, listenerName) val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt) val replicaInfo = getEndpoints(replicas, listenerName, errorUnavailableEndpoints) val offlineReplicaInfo = getEndpoints(partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints) maybeLeader match { case None => - debug(s"Error while fetching metadata for $topicPartition: leader not available") - new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(), + val error = if (!aliveBrokers.contains(brokerId)) { // we are already holding the read lock + debug(s"Error while fetching metadata for $topicPartition: leader not available") + Errors.LEADER_NOT_AVAILABLE + } else { + debug(s"Error while fetching metadata for $topicPartition: listener $listenerName not found on leader $leaderBrokerId") + if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE + } + new MetadataResponse.PartitionMetadata(error, partitionId, Node.noNode(), replicaInfo.asJava, java.util.Collections.emptyList(), offlineReplicaInfo.asJava) case Some(leader) => @@ -112,10 +122,11 @@ class MetadataCache(brokerId: Int) extends Logging { } // errorUnavailableEndpoints exists to support v0 MetadataResponses - def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = { + def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false, + errorUnavailableListeners: Boolean = false): Seq[MetadataResponse.TopicMetadata] = { inReadLock(partitionMetadataLock) { topics.toSeq.flatMap { topic => - getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints).map { partitionMetadata => + getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava) } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 45b3fdc74bd..5f7c67cdc6a 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -824,10 +824,23 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet verifyAddListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI")) //verifyRemoveListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN")) verifyRemoveListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI")) + + // Verify that a listener added to a subset of servers doesn't cause any issues + // when metadata is processed by the client. + addListener(servers.tail, "SCRAM_LISTENER", SecurityProtocol.SASL_PLAINTEXT, Seq("SCRAM-SHA-256")) + val bootstrap = TestUtils.bootstrapServers(servers.tail, new ListenerName("SCRAM_LISTENER")) + val producer = ProducerBuilder().bootstrapServers(bootstrap) + .securityProtocol(SecurityProtocol.SASL_PLAINTEXT) + .saslMechanism("SCRAM-SHA-256") + .maxRetries(1000) + .build() + val partitions = producer.partitionsFor(topic).asScala + assertEquals(0, partitions.count(p => p.leader != null && p.leader.id == servers.head.config.brokerId)) + assertTrue("Did not find partitions with no leader", partitions.exists(_.leader == null)) } - private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol, - saslMechanisms: Seq[String]): Unit = { + private def addListener(servers: Seq[KafkaServer], listenerName: String, securityProtocol: SecurityProtocol, + saslMechanisms: Seq[String]): Unit = { val config = servers.head.config val existingListenerCount = config.listeners.size val listeners = config.listeners @@ -867,14 +880,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } }), "Listener not created") - if (saslMechanisms.nonEmpty) - saslMechanisms.foreach { mechanism => - verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism") - } - else - verifyListener(securityProtocol, None, s"add-listener-group-$securityProtocol") - - val brokerConfigs = describeConfig(adminClients.head).entries.asScala + val brokerConfigs = describeConfig(adminClients.head, servers).entries.asScala props.asScala.foreach { case (name, value) => val entry = brokerConfigs.find(_.name == name).getOrElse(throw new IllegalArgumentException(s"Config not found $name")) if (DynamicBrokerConfig.isPasswordConfig(name) || name == unknownConfig) @@ -884,6 +890,17 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } } + private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol, + saslMechanisms: Seq[String]): Unit = { + addListener(servers, listenerName, securityProtocol, saslMechanisms) + if (saslMechanisms.nonEmpty) + saslMechanisms.foreach { mechanism => + verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism") + } + else + verifyListener(securityProtocol, None, s"add-listener-group-$securityProtocol") + } + private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol, saslMechanisms: Seq[String]): Unit = { val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head @@ -1013,7 +1030,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } } - private def describeConfig(adminClient: AdminClient): Config = { + private def describeConfig(adminClient: AdminClient, servers: Seq[KafkaServer] = this.servers): Config = { val configResources = servers.map { server => new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) } diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 0ee73659780..82c14ee5827 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -110,7 +110,47 @@ class MetadataCacheTest { } @Test - def getTopicMetadataPartitionLeaderNotAvailable() { + def getTopicMetadataPartitionLeaderNotAvailable(): Unit = { + val securityProtocol = SecurityProtocol.PLAINTEXT + val listenerName = ListenerName.forSecurityProtocol(securityProtocol) + val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, null)) + verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName, + leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false) + verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName, + leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = true) + } + + @Test + def getTopicMetadataPartitionListenerNotAvailableOnLeader(): Unit = { + val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL) + val broker0Endpoints = Seq( + new EndPoint("host0", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName), + new EndPoint("host0", 9093, SecurityProtocol.SSL, sslListenerName)) + val broker1Endpoints = Seq(new EndPoint("host1", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName)) + val brokers = Set(new Broker(0, broker0Endpoints.asJava, null), new Broker(1, broker1Endpoints.asJava, null)) + verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName, + leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true) + } + + @Test + def getTopicMetadataPartitionListenerNotAvailableOnLeaderOldMetadataVersion(): Unit = { + val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL) + val broker0Endpoints = Seq( + new EndPoint("host0", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName), + new EndPoint("host0", 9093, SecurityProtocol.SSL, sslListenerName)) + val broker1Endpoints = Seq(new EndPoint("host1", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName)) + val brokers = Set(new Broker(0, broker0Endpoints.asJava, null), new Broker(1, broker1Endpoints.asJava, null)) + verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName, + leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false) + } + + private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers: Set[Broker], + listenerName: ListenerName, + leader: Int, + expectedError: Errors, + errorUnavailableListeners: Boolean): Unit = { val topic = "topic" val cache = new MetadataCache(1) @@ -118,11 +158,7 @@ class MetadataCacheTest { val zkVersion = 3 val controllerId = 2 val controllerEpoch = 1 - val securityProtocol = SecurityProtocol.PLAINTEXT - val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, null)) - val leader = 1 val leaderEpoch = 1 val partitionStates = Map( new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asList(0), asList())) @@ -132,7 +168,7 @@ class MetadataCacheTest { partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) - val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName) + val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableListeners = errorUnavailableListeners) assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head @@ -143,7 +179,7 @@ class MetadataCacheTest { val partitionMetadata = partitionMetadatas.get(0) assertEquals(0, partitionMetadata.partition) - assertEquals(Errors.LEADER_NOT_AVAILABLE, partitionMetadata.error) + assertEquals(expectedError, partitionMetadata.error) assertTrue(partitionMetadata.isr.isEmpty) assertEquals(1, partitionMetadata.replicas.size) assertEquals(0, partitionMetadata.replicas.get(0).id) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener > ---------------------------------------------------------------- > > Key: KAFKA-6546 > URL: https://issues.apache.org/jira/browse/KAFKA-6546 > Project: Kafka > Issue Type: Improvement > Components: core > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Major > Fix For: 2.1.0 > > > In 1,1, if an endpoint is available on the broker processing a metadata > request, but the corresponding listener is not available on the leader of a > partition, LEADER_NOT_AVAILABLE is returned (earlier versions returned > UNKNOWN_SERVER_ERROR). This could indicate broker misconfiguration where some > brokers are not configured with all listeners or it could indicate a > transient error when listeners are dynamically added, We want to treat the > error as a transient error to process dynamic updates, but we should notify > clients of the actual error. This change should be made when MetadataRequest > version is updated so that LEADER_NOT_AVAILABLE is returned to older clients. > SeeĀ > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration] > andĀ [https://github.com/apache/kafka/pull/4539] for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)