chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586131042



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,17 +162,164 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data,
+      request.context.apiVersion,
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator)).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {
+    // Check if topic deletion is enabled at all.
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    // The first step is to load up the names and IDs that have been provided 
by the
+    // request.  This is a bit messy because we support multiple ways of 
referring to
+    // topics (both by name and by id) and because we need to check for 
duplicates or
+    // other invalid inputs.
+    val responses = new util.ArrayList[DeletableTopicResult]
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error.code).
+        setErrorMessage(error.message))
+    }
+    val providedNames = new util.HashSet[String]
+    val duplicateProvidedNames = new util.HashSet[String]
+    val providedIds = new util.HashSet[Uuid]
+    val duplicateProvidedIds = new util.HashSet[Uuid]
+    def addProvidedName(name: String): Unit = {
+      if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) {
+        duplicateProvidedNames.add(name)
+        providedNames.remove(name)
+      }
+    }
+    request.topicNames.forEach(addProvidedName)
+    request.topics.forEach {
+      topic => if (topic.name == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else if (duplicateProvidedIds.contains(topic.topicId) || 
!providedIds.add(topic.topicId)) {
+          duplicateProvidedIds.add(topic.topicId)
+          providedIds.remove(topic.topicId)
+        }
+      } else {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          addProvidedName(topic.name)
+        } else {
+          appendResponse(topic.name, topic.topicId, new 
ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+    // Create error responses for duplicates.
+    duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID,
+      new ApiError(INVALID_REQUEST, "Duplicate topic name.")))
+    duplicateProvidedIds.forEach(id => appendResponse(null, id,
+      new ApiError(INVALID_REQUEST, "Duplicate topic id.")))
+    // At this point we have all the valid names and IDs that have been 
provided.
+    // However, the Authorizer needs topic names as inputs, not topic IDs.  So
+    // we need to resolve all IDs to names.
+    val toAuthenticate = new util.HashSet[String]
+    toAuthenticate.addAll(providedNames)
+    val idToName = new util.HashMap[Uuid, String]
+    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
+      if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        toAuthenticate.add(nameOrError.result())
+        idToName.put(id, nameOrError.result())
+      }
+    }
+    // Get the list of deletable topics (those we can delete) and the list of 
describeable
+    // topics.  If a topic can't be deleted or described, we have to act like 
it doesn't
+    // exist, even when it does.
+    val topicsToAuthenticate = toAuthenticate.asScala
+    val (describeable, deletable) = if (hasClusterAuth) {
+      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
+    } else {
+      (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
+    }
+    // For each topic that was provided by ID, check if authentication failed.
+    // If so, remove it from the idToName map and create an error response for 
it.
+    val iterator = idToName.entrySet().iterator()
+    while (iterator.hasNext) {
+      val entry = iterator.next()
+      val id = entry.getKey
+      val name = entry.getValue
+      if (!deletable.contains(name)) {
+        if (describeable.contains(name)) {
+          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+        } else {
+          appendResponse(null, id, new ApiError(UNKNOWN_TOPIC_ID))

Review comment:
       It reaches consensus on  #10223 that the error should be 
`TOPIC_AUTHORIZATION_FAILED` rather than `UNKNOWN_TOPIC_ID`
   
   see `KafkaApis` 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1901)

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,17 +162,164 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data,
+      request.context.apiVersion,
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator)).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {
+    // Check if topic deletion is enabled at all.
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    // The first step is to load up the names and IDs that have been provided 
by the
+    // request.  This is a bit messy because we support multiple ways of 
referring to
+    // topics (both by name and by id) and because we need to check for 
duplicates or
+    // other invalid inputs.
+    val responses = new util.ArrayList[DeletableTopicResult]
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error.code).
+        setErrorMessage(error.message))
+    }
+    val providedNames = new util.HashSet[String]
+    val duplicateProvidedNames = new util.HashSet[String]
+    val providedIds = new util.HashSet[Uuid]
+    val duplicateProvidedIds = new util.HashSet[Uuid]
+    def addProvidedName(name: String): Unit = {
+      if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) {
+        duplicateProvidedNames.add(name)
+        providedNames.remove(name)
+      }
+    }
+    request.topicNames.forEach(addProvidedName)
+    request.topics.forEach {
+      topic => if (topic.name == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else if (duplicateProvidedIds.contains(topic.topicId) || 
!providedIds.add(topic.topicId)) {
+          duplicateProvidedIds.add(topic.topicId)
+          providedIds.remove(topic.topicId)
+        }
+      } else {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          addProvidedName(topic.name)
+        } else {
+          appendResponse(topic.name, topic.topicId, new 
ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+    // Create error responses for duplicates.
+    duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID,
+      new ApiError(INVALID_REQUEST, "Duplicate topic name.")))
+    duplicateProvidedIds.forEach(id => appendResponse(null, id,
+      new ApiError(INVALID_REQUEST, "Duplicate topic id.")))
+    // At this point we have all the valid names and IDs that have been 
provided.
+    // However, the Authorizer needs topic names as inputs, not topic IDs.  So
+    // we need to resolve all IDs to names.
+    val toAuthenticate = new util.HashSet[String]
+    toAuthenticate.addAll(providedNames)
+    val idToName = new util.HashMap[Uuid, String]
+    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
+      if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        toAuthenticate.add(nameOrError.result())
+        idToName.put(id, nameOrError.result())
+      }
+    }
+    // Get the list of deletable topics (those we can delete) and the list of 
describeable
+    // topics.  If a topic can't be deleted or described, we have to act like 
it doesn't
+    // exist, even when it does.
+    val topicsToAuthenticate = toAuthenticate.asScala
+    val (describeable, deletable) = if (hasClusterAuth) {
+      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
+    } else {
+      (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
+    }
+    // For each topic that was provided by ID, check if authentication failed.
+    // If so, remove it from the idToName map and create an error response for 
it.
+    val iterator = idToName.entrySet().iterator()
+    while (iterator.hasNext) {
+      val entry = iterator.next()
+      val id = entry.getKey
+      val name = entry.getValue
+      if (!deletable.contains(name)) {
+        if (describeable.contains(name)) {
+          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+        } else {
+          appendResponse(null, id, new ApiError(UNKNOWN_TOPIC_ID))
+        }
+        iterator.remove()
+      }
+    }
+    // For each topic that was provided by name, check if authentication 
failed.
+    // If so, create an error response for it.  Otherwise, add it to the 
idToName map.
+    controller.findTopicIds(providedNames).get().forEach { (name, idOrError) =>
+      if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error)
+      } else if (deletable.contains(name)) {
+        val id = idOrError.result()
+        if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != 
null) {
+          // This is kind of a weird case: what if we supply topic ID X and 
also a name
+          // that maps to ID X?  In that case, _if authorization succeeds_, we 
end up
+          // here.  If authorization doesn't succeed, we refrain from 
commenting on the
+          // situation since it would reveal topic ID mappings.
+          duplicateProvidedIds.add(id)
+          idToName.remove(id)
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+            "The provided topic name maps to an ID that was already 
supplied."))
+        }
+      } else if (describeable.contains(name)) {
+        appendResponse(name, idOrError.result(), new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
+      } else {
+        appendResponse(name, ZERO_UUID, new 
ApiError(UNKNOWN_TOPIC_OR_PARTITION))

Review comment:
       topic id is NOT sensitive (see discussion 
https://issues.apache.org/jira/browse/KAFKA-12369) so it is ok to return topic 
id. Also, the error code should be `TOPIC_AUTHORIZATION_FAILED` as it can 
produce quick failure (`UNKNOWN_TOPIC_OR_PARTITION ` is a retryable error).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to