chia7712 commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r582599257
########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -195,6 +201,147 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). + setResponses(new DeletableTopicResultCollection(responses.iterator())). + setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) + }) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { + if (!config.deleteTopicEnable) { + if (apiVersion < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") + } else { + throw new TopicDeletionDisabledException() + } + } + val responses = new util.ArrayList[DeletableTopicResult] + val duplicatedTopicNames = new util.HashSet[String] + val topicNamesToResolve = new util.HashSet[String] + val topicIdsToResolve = new util.HashSet[Uuid] + val duplicatedTopicIds = new util.HashSet[Uuid] + + def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). + setName(name). + setTopicId(id). + setErrorCode(error.error().code()). + setErrorMessage(error.message())) + } + + def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { + appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) + topicNamesToResolve.remove(name) + duplicatedTopicNames.add(name) + } + } + + def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { + appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) + topicIdsToResolve.remove(id) + duplicatedTopicIds.add(id) + } + } + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve(_)) + + request.topics().iterator().asScala.foreach { + case topic => if (topic.name() == null) { Review comment: the default value of `topic.name` is empty string so does it need to check empty string also? ########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -18,36 +18,42 @@ package kafka.server import java.util +import java.util.Collections +import java.util.concurrent.ExecutionException import kafka.network.RequestChannel import kafka.raft.RaftManager import kafka.server.QuotaFactory.QuotaManagers import kafka.utils.Logging import org.apache.kafka.clients.admin.AlterConfigOp -import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DESCRIBE} +import org.apache.kafka.common.Uuid.ZERO_UUID +import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE} import org.apache.kafka.common.config.ConfigResource -import org.apache.kafka.common.errors.ApiException +import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, TopicDeletionDisabledException} import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.message.ApiVersionsResponseData.{ApiVersion, SupportedFeatureKey} import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult +import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker -import org.apache.kafka.common.message.{ApiVersionsResponseData, BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, UnregisterBrokerResponseData, VoteResponseData} +import org.apache.kafka.common.message.{ApiVersionsResponseData, BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DeleteTopicsRequestData, DeleteTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, UnregisterBrokerResponseData, VoteResponseData} +import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, TOPIC_AUTHORIZATION_FAILED, UNKNOWN_TOPIC_ID, UNKNOWN_TOPIC_OR_PARTITION} import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} import org.apache.kafka.common.record.BaseRecords import org.apache.kafka.common.requests._ import org.apache.kafka.common.resource.Resource import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC} import org.apache.kafka.common.utils.Time -import org.apache.kafka.common.Node +import org.apache.kafka.common.{Node, Uuid} import org.apache.kafka.controller.Controller import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, FeatureMap, FeatureMapAndEpoch, VersionRange} import org.apache.kafka.server.authorizer.Authorizer import scala.collection.mutable import scala.jdk.CollectionConverters._ + Review comment: unintentional? ########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -195,6 +201,147 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). + setResponses(new DeletableTopicResultCollection(responses.iterator())). + setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) + }) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { + if (!config.deleteTopicEnable) { + if (apiVersion < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") + } else { + throw new TopicDeletionDisabledException() + } + } + val responses = new util.ArrayList[DeletableTopicResult] + val duplicatedTopicNames = new util.HashSet[String] + val topicNamesToResolve = new util.HashSet[String] + val topicIdsToResolve = new util.HashSet[Uuid] + val duplicatedTopicIds = new util.HashSet[Uuid] + + def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). + setName(name). + setTopicId(id). + setErrorCode(error.error().code()). + setErrorMessage(error.message())) + } + + def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { + appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) + topicNamesToResolve.remove(name) + duplicatedTopicNames.add(name) + } + } + + def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { + appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) + topicIdsToResolve.remove(id) + duplicatedTopicIds.add(id) + } + } + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve(_)) Review comment: `foreach(maybeAppendToTopicNamesToResolve(_))` -> `foreach(maybeAppendToTopicNamesToResolve)` ########## File path: core/src/main/scala/kafka/server/ControllerApis.scala ########## @@ -195,6 +201,147 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { + val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). + setResponses(new DeletableTopicResultCollection(responses.iterator())). + setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) + }) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { + if (!config.deleteTopicEnable) { + if (apiVersion < 3) { + throw new InvalidRequestException("Topic deletion is disabled.") + } else { + throw new TopicDeletionDisabledException() + } + } + val responses = new util.ArrayList[DeletableTopicResult] + val duplicatedTopicNames = new util.HashSet[String] + val topicNamesToResolve = new util.HashSet[String] + val topicIdsToResolve = new util.HashSet[Uuid] + val duplicatedTopicIds = new util.HashSet[Uuid] + + def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). + setName(name). + setTopicId(id). + setErrorCode(error.error().code()). + setErrorMessage(error.message())) + } + + def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { + appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) + topicNamesToResolve.remove(name) + duplicatedTopicNames.add(name) + } + } + + def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { + appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) + topicIdsToResolve.remove(id) + duplicatedTopicIds.add(id) + } + } + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve(_)) + + request.topics().iterator().asScala.foreach { + case topic => if (topic.name() == null) { Review comment: `case` is redundant ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -541,6 +559,57 @@ static void validateNewTopicNames(Map<String, ApiError> topicErrors, return configChanges; } + Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) { + Map<String, ResultOrError<Uuid>> results = new HashMap<>(); + for (String name : names) { + if (name == null) { + results.put(null, new ResultOrError<>( + new ApiError(INVALID_REQUEST, "Invalid null topic name."))); + } else { + Uuid id = topicsByName.get(name, offset); + if (id == null) { + results.put(null, new ResultOrError<>( Review comment: Why it pass `null` as topic name? ########## File path: metadata/src/main/java/org/apache/kafka/controller/Controller.java ########## @@ -69,6 +70,31 @@ */ CompletableFuture<Void> unregisterBroker(int brokerId); + /** + * Find the ids for topic names. + * + * @param topicNames The topic names to resolve. + * @return A future yielding a map from topic name to id. + */ + CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(Collection<String> topicNames); + + /** + * Find the names for topic ids. + * + * @param topicIds The topic ids to resolve. + * @return A future yielding a map from topic name to id. Review comment: Should it be `id to topic name`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org