chia7712 commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r586131042
########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -154,17 +162,164 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + val responses = deleteTopics(request.body[DeleteTopicsRequest].data, + request.context.apiVersion, + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). + setResponses(new DeletableTopicResultCollection(responses.iterator)). + setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) + }) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { + // Check if topic deletion is enabled at all. + if (!config.deleteTopicEnable) { + if (apiVersion < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") + } else { + throw new TopicDeletionDisabledException() + } + } + // The first step is to load up the names and IDs that have been provided by the + // request. This is a bit messy because we support multiple ways of referring to + // topics (both by name and by id) and because we need to check for duplicates or + // other invalid inputs. + val responses = new util.ArrayList[DeletableTopicResult] + def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). + setName(name). + setTopicId(id). + setErrorCode(error.error.code). + setErrorMessage(error.message)) + } + val providedNames = new util.HashSet[String] + val duplicateProvidedNames = new util.HashSet[String] + val providedIds = new util.HashSet[Uuid] + val duplicateProvidedIds = new util.HashSet[Uuid] + def addProvidedName(name: String): Unit = { + if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) { + duplicateProvidedNames.add(name) + providedNames.remove(name) + } + } + request.topicNames.forEach(addProvidedName) + request.topics.forEach { + topic => if (topic.name == null) { + if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, + "Neither topic name nor id were specified.")) + } else if (duplicateProvidedIds.contains(topic.topicId) || !providedIds.add(topic.topicId)) { + duplicateProvidedIds.add(topic.topicId) + providedIds.remove(topic.topicId) + } + } else { + if (topic.topicId.equals(ZERO_UUID)) { + addProvidedName(topic.name) + } else { + appendResponse(topic.name, topic.topicId, new ApiError(INVALID_REQUEST, + "You may not specify both topic name and topic id.")) + } + } + } + // Create error responses for duplicates. + duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID, + new ApiError(INVALID_REQUEST, "Duplicate topic name."))) + duplicateProvidedIds.forEach(id => appendResponse(null, id, + new ApiError(INVALID_REQUEST, "Duplicate topic id."))) + // At this point we have all the valid names and IDs that have been provided. + // However, the Authorizer needs topic names as inputs, not topic IDs. So + // we need to resolve all IDs to names. + val toAuthenticate = new util.HashSet[String] + toAuthenticate.addAll(providedNames) + val idToName = new util.HashMap[Uuid, String] + controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) => + if (nameOrError.isError) { + appendResponse(null, id, nameOrError.error()) + } else { + toAuthenticate.add(nameOrError.result()) + idToName.put(id, nameOrError.result()) + } + } + // Get the list of deletable topics (those we can delete) and the list of describeable + // topics. If a topic can't be deleted or described, we have to act like it doesn't + // exist, even when it does. + val topicsToAuthenticate = toAuthenticate.asScala + val (describeable, deletable) = if (hasClusterAuth) { + (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet) + } else { + (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate)) + } + // For each topic that was provided by ID, check if authentication failed. + // If so, remove it from the idToName map and create an error response for it. + val iterator = idToName.entrySet().iterator() + while (iterator.hasNext) { + val entry = iterator.next() + val id = entry.getKey + val name = entry.getValue + if (!deletable.contains(name)) { + if (describeable.contains(name)) { + appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED)) + } else { + appendResponse(null, id, new ApiError(UNKNOWN_TOPIC_ID)) Review comment: It reaches consensus on #10223 that the error should be `TOPIC_AUTHORIZATION_FAILED` rather than `UNKNOWN_TOPIC_ID` see `KafkaApis` (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1901) ########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -154,17 +162,164 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + val responses = deleteTopics(request.body[DeleteTopicsRequest].data, + request.context.apiVersion, + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). + setResponses(new DeletableTopicResultCollection(responses.iterator)). + setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) + }) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { + // Check if topic deletion is enabled at all. + if (!config.deleteTopicEnable) { + if (apiVersion < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") + } else { + throw new TopicDeletionDisabledException() + } + } + // The first step is to load up the names and IDs that have been provided by the + // request. This is a bit messy because we support multiple ways of referring to + // topics (both by name and by id) and because we need to check for duplicates or + // other invalid inputs. + val responses = new util.ArrayList[DeletableTopicResult] + def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). + setName(name). + setTopicId(id). + setErrorCode(error.error.code). + setErrorMessage(error.message)) + } + val providedNames = new util.HashSet[String] + val duplicateProvidedNames = new util.HashSet[String] + val providedIds = new util.HashSet[Uuid] + val duplicateProvidedIds = new util.HashSet[Uuid] + def addProvidedName(name: String): Unit = { + if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) { + duplicateProvidedNames.add(name) + providedNames.remove(name) + } + } + request.topicNames.forEach(addProvidedName) + request.topics.forEach { + topic => if (topic.name == null) { + if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, + "Neither topic name nor id were specified.")) + } else if (duplicateProvidedIds.contains(topic.topicId) || !providedIds.add(topic.topicId)) { + duplicateProvidedIds.add(topic.topicId) + providedIds.remove(topic.topicId) + } + } else { + if (topic.topicId.equals(ZERO_UUID)) { + addProvidedName(topic.name) + } else { + appendResponse(topic.name, topic.topicId, new ApiError(INVALID_REQUEST, + "You may not specify both topic name and topic id.")) + } + } + } + // Create error responses for duplicates. + duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID, + new ApiError(INVALID_REQUEST, "Duplicate topic name."))) + duplicateProvidedIds.forEach(id => appendResponse(null, id, + new ApiError(INVALID_REQUEST, "Duplicate topic id."))) + // At this point we have all the valid names and IDs that have been provided. + // However, the Authorizer needs topic names as inputs, not topic IDs. So + // we need to resolve all IDs to names. + val toAuthenticate = new util.HashSet[String] + toAuthenticate.addAll(providedNames) + val idToName = new util.HashMap[Uuid, String] + controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) => + if (nameOrError.isError) { + appendResponse(null, id, nameOrError.error()) + } else { + toAuthenticate.add(nameOrError.result()) + idToName.put(id, nameOrError.result()) + } + } + // Get the list of deletable topics (those we can delete) and the list of describeable + // topics. If a topic can't be deleted or described, we have to act like it doesn't + // exist, even when it does. + val topicsToAuthenticate = toAuthenticate.asScala + val (describeable, deletable) = if (hasClusterAuth) { + (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet) + } else { + (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate)) + } + // For each topic that was provided by ID, check if authentication failed. + // If so, remove it from the idToName map and create an error response for it. + val iterator = idToName.entrySet().iterator() + while (iterator.hasNext) { + val entry = iterator.next() + val id = entry.getKey + val name = entry.getValue + if (!deletable.contains(name)) { + if (describeable.contains(name)) { + appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED)) + } else { + appendResponse(null, id, new ApiError(UNKNOWN_TOPIC_ID)) + } + iterator.remove() + } + } + // For each topic that was provided by name, check if authentication failed. + // If so, create an error response for it. Otherwise, add it to the idToName map. + controller.findTopicIds(providedNames).get().forEach { (name, idOrError) => + if (idOrError.isError) { + appendResponse(name, ZERO_UUID, idOrError.error) + } else if (deletable.contains(name)) { + val id = idOrError.result() + if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != null) { + // This is kind of a weird case: what if we supply topic ID X and also a name + // that maps to ID X? In that case, _if authorization succeeds_, we end up + // here. If authorization doesn't succeed, we refrain from commenting on the + // situation since it would reveal topic ID mappings. + duplicateProvidedIds.add(id) + idToName.remove(id) + appendResponse(name, id, new ApiError(INVALID_REQUEST, + "The provided topic name maps to an ID that was already supplied.")) + } + } else if (describeable.contains(name)) { + appendResponse(name, idOrError.result(), new ApiError(TOPIC_AUTHORIZATION_FAILED)) + } else { + appendResponse(name, ZERO_UUID, new ApiError(UNKNOWN_TOPIC_OR_PARTITION)) Review comment: topic id is NOT sensitive (see discussion https://issues.apache.org/jira/browse/KAFKA-12369) so it is ok to return topic id. Also, the error code should be `TOPIC_AUTHORIZATION_FAILED` as it can produce quick failure (`UNKNOWN_TOPIC_OR_PARTITION ` is a retryable error). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org