hachikuji commented on a change in pull request #9300:
URL: https://github.com/apache/kafka/pull/9300#discussion_r490604721



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1732,45 +1732,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(controllerMutationQuota, request, 
createResponse, onComplete = None)
     }
 
+    // be sure to check authorization first, before checking if this is the 
controller, to avoid leaking
+    // information about the system (i.e. who is the controller) to principals 
unauthorized for that information
+
     val createTopicsRequest = request.body[CreateTopicsRequest]
     val results = new 
CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-    if (!controller.isActive) {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name)
-          .setErrorCode(Errors.NOT_CONTROLLER.code))
-      }
-      sendResponseCallback(results)
-    } else {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name))
-      }
-      val hasClusterAuthorization = authorize(request.context, CREATE, 
CLUSTER, CLUSTER_NAME,
-        logIfDenied = false)
-      val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics =
-        if (hasClusterAuthorization) topics.toSet
-        else filterByAuthorized(request.context, CREATE, TOPIC, 
topics)(identity)
-      val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
-        topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
+    createTopicsRequest.data.topics.forEach { topic =>
+      results.add(new CreatableTopicResult().setName(topic.name))
+    }
+    val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, 
CLUSTER_NAME,
+      logIfDenied = false)
+    val topics = createTopicsRequest.data.topics.asScala.map(_.name)
+    val authorizedTopics =
+      if (hasClusterAuthorization) topics.toSet
+      else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+    val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,

Review comment:
       Might not matter since `CreateTopics` requests are infrequent, but the 
two passes for authorization are a bit vexing. Feels like we are missing a good 
intermediate type between this handler and `AdminManager`. Maybe we can replace 
the 3 maps that we pass to `AdminManager.createTopic` with a single map which 
contains all the state we need for each topic.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1732,45 +1732,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(controllerMutationQuota, request, 
createResponse, onComplete = None)
     }
 
+    // be sure to check authorization first, before checking if this is the 
controller, to avoid leaking
+    // information about the system (i.e. who is the controller) to principals 
unauthorized for that information
+
     val createTopicsRequest = request.body[CreateTopicsRequest]
     val results = new 
CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-    if (!controller.isActive) {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name)
-          .setErrorCode(Errors.NOT_CONTROLLER.code))
-      }
-      sendResponseCallback(results)
-    } else {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name))
-      }
-      val hasClusterAuthorization = authorize(request.context, CREATE, 
CLUSTER, CLUSTER_NAME,
-        logIfDenied = false)
-      val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics =
-        if (hasClusterAuthorization) topics.toSet
-        else filterByAuthorized(request.context, CREATE, TOPIC, 
topics)(identity)
-      val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
-        topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
+    createTopicsRequest.data.topics.forEach { topic =>
+      results.add(new CreatableTopicResult().setName(topic.name))
+    }
+    val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, 
CLUSTER_NAME,
+      logIfDenied = false)
+    val topics = createTopicsRequest.data.topics.asScala.map(_.name)
+    val authorizedTopics =
+      if (hasClusterAuthorization) topics.toSet
+      else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+    val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
+      topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
 
-      results.forEach { topic =>
-        if (results.findAll(topic.name).size > 1) {
-          topic.setErrorCode(Errors.INVALID_REQUEST.code)
-          topic.setErrorMessage("Found multiple entries for this topic.")
-        } else if (!authorizedTopics.contains(topic.name)) {
-          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-          topic.setErrorMessage("Authorization failed.")
-        }
-        if (!authorizedForDescribeConfigs.contains(topic.name)) {
-          topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-        }
+    results.forEach { topic =>
+      if (results.findAll(topic.name).size > 1) {
+        topic.setErrorCode(Errors.INVALID_REQUEST.code)
+        topic.setErrorMessage("Found multiple entries for this topic.")
+      } else if (!authorizedTopics.contains(topic.name)) {
+        topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+        topic.setErrorMessage("Authorization failed.")
       }
-      val toCreate = mutable.Map[String, CreatableTopic]()
-      createTopicsRequest.data.topics.forEach { topic =>
-        if (results.find(topic.name).errorCode == Errors.NONE.code) {
-          toCreate += topic.name -> topic
-        }
+      if (!authorizedForDescribeConfigs.contains(topic.name)) {
+        topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
       }
+    }
+    val toCreate = mutable.Map[String, CreatableTopic]()
+    createTopicsRequest.data.topics.forEach { topic =>
+      if (results.find(topic.name).errorCode == Errors.NONE.code) {
+        toCreate += topic.name -> topic
+      }
+    }
+    if (!controller.isActive) {
+      // don't provide the information that this node is not the controller 
unless they were authorized
+      // to perform at least one of their requests
+      sendResponseCallback(
+        if (toCreate.isEmpty) {

Review comment:
       Hmm.. If a topic is unauthorized, I think it's more important to return 
the authorization failure. There's not any reason I can think of why 
`NOT_CONTROLLER` should take precedence. It would just cause the request to be 
unnecessarily retried against the new coordinator. Similarly for the other APIs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to