Lincong commented on a change in pull request #9435: URL: https://github.com/apache/kafka/pull/9435#discussion_r522441321
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1236,9 +1236,19 @@ class KafkaApis(val requestChannel: RequestChannel, val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[MetadataResponseTopic] - else - getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName, - errorUnavailableEndpoints, errorUnavailableListeners) + else { + // KAFKA-10606: If this request is to get metadata for all topics, auto topic creation should not be allowed + // The special handling is necessary on broker side because allowAutoTopicCreation is hard coded to true + // for backward compatibility on client side. + val allowAutoTopicCreation = (!metadataRequest.isAllTopics) && metadataRequest.allowAutoTopicCreation Review comment: Hi @chia7712 , thank you for bringing up that issue and it's totally valid! In order to preserve its behavior (not including `UNKNOWN_TOPIC_OR_PARTITION ` in the response to fetch all topic metadata request), we implemented the below logic which filters out all entries in the response with `UNKNOWN_TOPIC_OR_PARTITION ` if the metadata request is to get metadata for all topisc. ``` val completeTopicMetadata = (if (metadataRequest.isAllTopics) { opicMetadata.filter(_.errorCode() != Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) } else { topicMetadata }) ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata ``` We have added a unit test as well. Hope to get some feedback from you soon! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org