hachikuji commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1393479827


##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicsRequest.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeTopicsRequestData;
+import org.apache.kafka.common.message.DescribeTopicsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class DescribeTopicsRequest extends AbstractRequest {
+    public static class Builder extends 
AbstractRequest.Builder<DescribeTopicsRequest> {
+        private final DescribeTopicsRequestData data;
+
+        public Builder(DescribeTopicsRequestData data) {
+            super(ApiKeys.DESCRIBE_TOPICS);
+            this.data = data;
+        }
+
+        public Builder(List<String> topics) {
+            super(ApiKeys.DESCRIBE_TOPICS, 
ApiKeys.DESCRIBE_TOPICS.oldestVersion(), 
ApiKeys.DESCRIBE_TOPICS.latestVersion());
+            DescribeTopicsRequestData data = new DescribeTopicsRequestData();
+            topics.forEach(topicName -> data.topics().add(new 
DescribeTopicsRequestData.TopicRequest().setName(topicName)));
+            this.data = data;
+        }
+
+        @Override
+        public DescribeTopicsRequest build(short version) {
+            return new DescribeTopicsRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+
+    }
+
+    private final DescribeTopicsRequestData data;
+
+    public DescribeTopicsRequest(DescribeTopicsRequestData data) {
+        super(ApiKeys.DESCRIBE_TOPICS, (short) 0);
+        this.data = data;
+    }
+
+    public DescribeTopicsRequest(DescribeTopicsRequestData data, short 
version) {
+        super(ApiKeys.DESCRIBE_TOPICS, version);
+        this.data = data;
+    }
+
+    @Override
+    public DescribeTopicsRequestData data() {
+        return data;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        Errors error = Errors.forException(e);
+        DescribeTopicsResponseData responseData = new 
DescribeTopicsResponseData();
+        for (DescribeTopicsRequestData.TopicRequest topic : data.topics()) {

Review Comment:
   Perhaps this is telling us that we need a top-level error in the response.



##########
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java:
##########
@@ -386,7 +387,8 @@ public enum Errors {
     STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry 
after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", 
StaleMemberEpochException::new),
     MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the 
wrong type.", MismatchedEndpointTypeException::new),
     UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", 
UnsupportedEndpointTypeException::new),
-    UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new);
+    UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new),
+    REQUEST_LIMIT_REACHED(117, "The request has reached the limit.", 
RequestLimitReachedException::new);

Review Comment:
   The phrasing is very vague. I guess it is a request _size_ limit?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1355,6 +1402,60 @@ class KafkaApis(val requestChannel: RequestChannel,
       ))
   }
 
+  def handleDescribeTopicsRequest(request: RequestChannel.Request): Unit = {
+    val describeTopicsRequest = request.body[DescribeTopicsRequest]
+
+    val topics = scala.collection.mutable.Map[String, Int]()
+    describeTopicsRequest.data.topics.forEach { topic =>
+      if (topic.name == null || topic.firstPartitionId() < 0) {
+        throw new InvalidRequestException(s"Topic name and first partition id 
must be set.")
+      }
+      topics.put(topic.name(), topic.firstPartitionId())
+    }
+
+    val fetchAllTopics = topics.isEmpty

Review Comment:
   How does `firstPartitionIndex` work when requesting all topics? Will we 
still return all topics in the initial response?



##########
clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java:
##########
@@ -114,6 +114,7 @@ public enum ApiKeys {
     CONSUMER_GROUP_HEARTBEAT(ApiMessageType.CONSUMER_GROUP_HEARTBEAT),
     CONSUMER_GROUP_DESCRIBE(ApiMessageType.CONSUMER_GROUP_DESCRIBE),
     CONTROLLER_REGISTRATION(ApiMessageType.CONTROLLER_REGISTRATION),
+    DESCRIBE_TOPICS(ApiMessageType.DESCRIBE_TOPICS),

Review Comment:
   nit: looks misaligned



##########
clients/src/main/java/org/apache/kafka/common/errors/RequestLimitReachedException.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class RequestLimitReachedException extends ApiException {

Review Comment:
   Would this be retriable?



##########
clients/src/main/resources/common/message/DescribeTopicsResponse.json:
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 73,
+  "type": "response",
+  "name": "DescribeTopicsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
"ignorable": true,
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "Topics", "type": "[]DescribeTopicsResponseTopic", "versions": 
"0+",
+      "about": "Each topic in the response.", "fields": [
+      { "name": "ErrorCode", "type": "int16", "versions": "0+",
+        "about": "The topic error, or 0 if there was no error." },
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "topicName", "nullableVersions": "0+",
+        "about": "The topic name." },
+      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": 
true, "about": "The topic id." },
+      { "name": "IsInternal", "type": "bool", "versions": "0+", "default": 
"false", "ignorable": true,
+        "about": "True if the topic is internal." },
+      { "name": "Partitions", "type": "[]DescribeTopicsResponsePartition", 
"versions": "0+",
+        "about": "Each partition in the topic.", "fields": [
+        { "name": "ErrorCode", "type": "int16", "versions": "0+",
+          "about": "The partition error, or 0 if there was no error." },
+        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+          "about": "The partition index." },
+        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+          "about": "The ID of the leader broker." },
+        { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1", "ignorable": true,
+          "about": "The leader epoch of this partition." },
+        { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", 
"entityType": "brokerId",
+          "about": "The set of all nodes that host this partition." },
+        { "name": "IsrNodes", "type": "[]int32", "versions": "0+", 
"entityType": "brokerId",
+          "about": "The set of nodes that are in sync with the leader for this 
partition." },
+        { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": 
"null", "entityType": "brokerId",
+          "versions": "0+", "nullableVersions": "0+",
+          "about": "The new eligible leader replicas otherwise." },
+        { "name": "LastKnownELR", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
+          "versions": "0+", "nullableVersions": "0+",
+          "about": "The last known ELR." },
+        { "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", 
"ignorable": true, "entityType": "brokerId",
+          "about": "The set of offline replicas of this partition." }]},
+      { "name": "TopicAuthorizedOperations", "type": "int32", "versions": 
"0+", "default": "-2147483648",
+        "about": "32-bit bitfield to represent authorized operations for this 
topic." },
+      { "name": "NextPartition", "type": "int32", "versions": "0+", "default": 
"-1",
+        "about": "The first partition that exceed the request limit. " }]}

Review Comment:
   I guess we leave this empty for most partitions? Maybe useful to clarify in 
the description. Also, I guess this could be tagged?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -116,6 +117,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
   val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", 
config)
   val configManager = new ConfigAdminManager(brokerId, config, 
configRepository)
+  val partitionRequestLimit = 2000

Review Comment:
   Usually we need to be able to configure limits. 



##########
clients/src/main/resources/common/message/DescribeTopicsRequest.json:
##########
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 73,
+  "type": "request",
+  "listeners": ["broker"],
+  "name": "DescribeTopicsRequest",

Review Comment:
   It's really more of a `DescribeTopicPartitions` API. Was that name 
considered?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to