junrao commented on code in PR #19635:
URL: https://github.com/apache/kafka/pull/19635#discussion_r2082629710


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
 
         val topicPartition = new TopicPartition(topicName, partition.index())
-        if (topicName.isEmpty)
+        // To be compatible with the old version, only return UNKNOWN_TOPIC_ID 
if topic name is not set and request version is bigger than 12

Review Comment:
   if topic name is not set and request version is bigger than 12 => if request 
version uses topicId, but the corresponding topic name can't be found



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -923,8 +925,56 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     sendRequests(requestKeyToRequest, false, topicNames)
   }
 
+  /**
+   * Test that the produce request fails with TOPIC_AUTHORIZATION_FAILED if 
the client doesn't have permission
+   * and topic name is used in the request. Even if the topic doesn't exist, 
we return TOPIC_AUTHORIZATION_FAILED to
+   * prevent leaking the topic name.
+   * This case covers produce request version from oldest to 12.
+   * The newer version is covered by testAuthorizationWithTopicNotExisting and 
testAuthorizationWithTopicExisting.
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testAuthorizationProduceVersionFromOldestTo12(withTopicExisting: 
Boolean): Unit = {
+    if (withTopicExisting) {
+      createTopicWithBrokerPrincipal(topic)
+    }
+
+    for (version <- ApiKeys.PRODUCE.oldestVersion to 12) {
+      val request = createProduceRequest(topic, Uuid.ZERO_UUID, 
version.toShort)
+      val response = connectAndReceive[AbstractResponse](request, listenerName 
= listenerName)
+      val errorCode = response.asInstanceOf[ProduceResponse]
+        .data()
+        .responses()
+        .find(topic, Uuid.ZERO_UUID)
+        .partitionResponses.asScala.find(_.index == part).get
+        .errorCode
+
+      assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), errorCode, 
s"unexpected error for produce request version $version")
+    }
+  }
+
+  /**
+   * Test that the produce request fails with UNKNOWN_TOPIC_ID if both topic 
name and id are default values when request version >= 13.
+   * The produce request only supports topic id above version 13.
+   */
+  @Test
+  def testEmptyTopicNameAndIDForProduceVersionFrom13ToNewest(): Unit = {

Review Comment:
   This test is covered by `testAuthorizationWithTopicNotExisting`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to