[ https://issues.apache.org/jira/browse/KAFKA-19234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17949112#comment-17949112 ]
Jun Rao commented on KAFKA-19234: --------------------------------- It seems that the Fetch request does the right thing. If an old version of the fetch request includes topic name that doesn't exist, it will go through the authorization check first. We can just follow that approach. {code:java} // Regular Kafka consumers need READ permission on each partition they are fetching. val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)] fetchContext.foreachPartition { (topicIdPartition, partitionData) => if (topicIdPartition.topic == null) erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) else partitionDatas += topicIdPartition -> partitionData } val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topicPartition.topic) partitionDatas.foreach { case (topicIdPartition, data) => if (!authorizedTopics.contains(topicIdPartition.topic)) erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED) else if (!metadataCache.contains(topicIdPartition.topicPartition)) erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) {code} > broker should return UNAUTHORIZATION error for non-existing topic in produce > request > ------------------------------------------------------------------------------------ > > Key: KAFKA-19234 > URL: https://issues.apache.org/jira/browse/KAFKA-19234 > Project: Kafka > Issue Type: Bug > Components: core > Reporter: Jun Rao > Priority: Major > > In [https://github.com/apache/kafka/pull/15968/files], we changed the > authorization logic slightly. If a produce request includes a topic name that > doesn't exist and is not authorized, it now returns an > UNKNOWN_TOPIC_OR_PARTITION error. Since topic name is sensitive information, > it should return a TOPIC_AUTHORIZATION_FAILED error as before that PR. > {code:java} > else if (!metadataCache.contains(topicPartition)) > nonExistingTopicResponses += new TopicIdPartition(topicId, > topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)