dengziming commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r550836862



##########
File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala
##########
@@ -621,10 +621,10 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
         case Code.NONODE => None
         case _ => throw getDataResponse.resultException.get
       }
-    }.map(_.get)
-      .map(topicIdAssignment => (topicIdAssignment.topic,
-        topicIdAssignment.topicId.getOrElse(
-          throw new IllegalStateException("Topic " + topicIdAssignment.topic + 
" does not have a topic ID."))))
+    }.filter(_.isDefined)

Review comment:
       nit: getDataResponses.map(xxx).filter(_.isDefined).map(_.get) can be 
replaced by getDataResponses.flatMap(xxx)

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1981,29 +1986,39 @@ class KafkaApis(val requestChannel: RequestChannel,
     val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
     val toDelete = mutable.Set[String]()
     if (!controller.isActive) {
-      deleteTopicRequest.data.topicNames.forEach { topic =>
+      deleteTopicRequest.topics().forEach { topic =>
         results.add(new DeletableTopicResult()
-          .setName(topic)
+          .setName(topic.name())
+          .setTopicId(topic.topicId())
           .setErrorCode(Errors.NOT_CONTROLLER.code))
       }
       sendResponseCallback(results)
     } else if (!config.deleteTopicEnable) {
       val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-      deleteTopicRequest.data.topicNames.forEach { topic =>
+      deleteTopicRequest.topics().forEach { topic =>
         results.add(new DeletableTopicResult()
-          .setName(topic)
+          .setName(topic.name())
+          .setTopicId(topic.topicId())
           .setErrorCode(error.code))
       }
       sendResponseCallback(results)
     } else {
-      deleteTopicRequest.data.topicNames.forEach { topic =>
+      deleteTopicRequest.topics().forEach { topic =>
+        val name = if (topic.name() != null) topic.name() 

Review comment:
       According to the kip, we will use topicId firstly and topic name will be 
used only if topicId is ZERO, but you just use the topic name even topicId is 
not ZERO here?

##########
File path: clients/src/main/resources/common/message/DeleteTopicsRequest.json
##########
@@ -23,10 +23,17 @@
   //
   // Version 5 adds ErrorMessage in the response and may return a 
THROTTLING_QUOTA_EXCEEDED error
   // in the response if the topics deletion is throttled (KIP-599).
-  "validVersions": "0-5",
+  //
+  // Version 6 reorganizes topics, adds topic IDs and allows topic names to be 
null.
+  "validVersions": "0-6",
   "flexibleVersions": "4+",
   "fields": [
-    { "name": "TopicNames", "type": "[]string", "versions": "0+", 
"entityType": "topicName",
+    { "name": "Topics", "type": "[]DeleteTopicState", "versions": "6+", 
"about": "The name or topic ID of the topic",
+      "fields": [
+        {"name": "Name", "type": "string", "versions": "6+", 
"nullableVersions": "6+", "about": "The topic name"},

Review comment:
       Why is topic name nullable but topicId not? do you mean we will fill all 
delete requests with a topicId before sending a request?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1694,6 +1722,79 @@ void handleFailure(Throwable throwable) {
             }
         };
     }
+   
+    private Call getDeleteTopicsWithIdsCall(final DeleteTopicsOptions options,
+                                     final Map<Uuid, KafkaFutureImpl<Void>> 
futures,
+                                     final List<Uuid> topicIds,
+                                     final Map<Uuid, 
ThrottlingQuotaExceededException> quotaExceededExceptions,
+                                     final long now,
+                                     final long deadline) {
+        return new Call("deleteTopics", deadline, new 
ControllerNodeProvider()) {
+            @Override
+            DeleteTopicsRequest.Builder createRequest(int timeoutMs) {
+                return new DeleteTopicsRequest.Builder(
+                        new DeleteTopicsRequestData()
+                                .setTopics(topicIds.stream().map(
+                                    topic -> new 
DeleteTopicState().setTopicId(topic)).collect(Collectors.toList()))
+                                .setTimeoutMs(timeoutMs));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                // Check for controller change
+                handleNotControllerError(abstractResponse);
+                // Handle server responses for particular topics.
+                final DeleteTopicsResponse response = (DeleteTopicsResponse) 
abstractResponse;
+                final List<Uuid> retryTopics = new ArrayList<>();
+                final Map<Uuid, ThrottlingQuotaExceededException> 
retryTopicQuotaExceededExceptions = new HashMap<>();
+                for (DeletableTopicResult result : 
response.data().responses()) {
+                    KafkaFutureImpl<Void> future = 
futures.get(result.topicId());
+                    if (future == null) {
+                        log.warn("Server response mentioned unknown topic {}", 
result.name());

Review comment:
       here we'd better use result.topicId() since we are using topicId. 

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1896,6 +1896,11 @@ class KafkaApis(val requestChannel: RequestChannel,
               .setTopicConfigErrorCode(Errors.NONE.code)
           }
         }
+        val topicIds = 
zkClient.getTopicIdsForTopics(results.asScala.map(result => 
result.name()).toSet)

Review comment:
       Here could we use controllerContext since createTopicRep is handled at 
the active controller.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
##########
@@ -39,8 +45,25 @@ public Builder(DeleteTopicsRequestData data) {
 
         @Override
         public DeleteTopicsRequest build(short version) {
+            if (version >= 6 && data.topicNames().size() != 0) {
+                data.setTopics(groupByTopic(data.topicNames()));
+            } else if (version >= 6) {
+                for (DeleteTopicState topic : data.topics()) {
+                    if (topic.name().equals("")) {

Review comment:
       if topic.name() == null, a NEP will throw here.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1623,6 +1625,32 @@ public DeleteTopicsResult deleteTopics(final 
Collection<String> topicNames,
         return new DeleteTopicsResult(new HashMap<>(topicFutures));
     }
 
+    @Override
+    public DeleteTopicsWithIdsResult deleteTopicsWithIds(final 
Collection<Uuid> topicIds,
+                                           final DeleteTopicsOptions options) {
+        final Map<Uuid, KafkaFutureImpl<Void>> topicFutures = new 
HashMap<>(topicIds.size());
+        final List<Uuid> validTopicIds = new ArrayList<>(topicIds.size());
+        for (Uuid topicId : topicIds) {
+            if (topicId.equals(Uuid.ZERO_UUID)) {
+                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic ID '" +

Review comment:
       Should this be UnknownTopicIdException?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to