junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r617932245



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -662,11 +662,21 @@ class KafkaApis(val requestChannel: RequestChannel,
     val versionId = request.header.apiVersion
     val clientId = request.header.clientId
     val fetchRequest = request.body[FetchRequest]
+    val (topicIds, topicNames) =
+      if (fetchRequest.version() >= 13)
+        metadataCache.topicIdInfo()
+      else
+        (Collections.emptyMap[String, Uuid](), Collections.emptyMap[Uuid, 
String]())
+
     val fetchContext = fetchManager.newContext(
+      fetchRequest.version,
       fetchRequest.metadata,
-      fetchRequest.fetchData,
-      fetchRequest.toForget,
-      fetchRequest.isFromFollower)
+      fetchRequest.isFromFollower,
+      fetchRequest.fetchDataAndError(topicNames),
+      fetchRequest.forgottenTopics(topicNames),
+      topicNames,
+      topicIds)
+

Review comment:
       extra newline.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -110,67 +111,164 @@ public String toString() {
         }
     }
 
-    private Optional<Integer> optionalEpoch(int rawEpochValue) {
+    private static Optional<Integer> optionalEpoch(int rawEpochValue) {
         if (rawEpochValue < 0) {
             return Optional.empty();
         } else {
             return Optional.of(rawEpochValue);
         }
     }
 
+    // Only used when version is lower than 13.
     private Map<TopicPartition, PartitionData> 
toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
         Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
         fetchableTopics.forEach(fetchTopic -> 
fetchTopic.partitions().forEach(fetchPartition -> {
             result.put(new TopicPartition(fetchTopic.topic(), 
fetchPartition.partition()),
-                new PartitionData(
-                    fetchPartition.fetchOffset(),
-                    fetchPartition.logStartOffset(),
-                    fetchPartition.partitionMaxBytes(),
-                    optionalEpoch(fetchPartition.currentLeaderEpoch()),
-                    optionalEpoch(fetchPartition.lastFetchedEpoch())
-                ));
+                    new PartitionData(
+                            fetchPartition.fetchOffset(),
+                            fetchPartition.logStartOffset(),
+                            fetchPartition.partitionMaxBytes(),
+                            optionalEpoch(fetchPartition.currentLeaderEpoch()),
+                            optionalEpoch(fetchPartition.lastFetchedEpoch())
+                    ));
         }));
         return Collections.unmodifiableMap(result);
     }
 
-    private List<TopicPartition> 
toForgottenTopicList(List<FetchRequestData.ForgottenTopic> forgottenTopics) {
-        List<TopicPartition> result = new ArrayList<>();
-        forgottenTopics.forEach(forgottenTopic ->
-            forgottenTopic.partitions().forEach(partitionId ->
-                result.add(new TopicPartition(forgottenTopic.topic(), 
partitionId))
-            )
-        );
-        return result;
+    /**
+     *  The following methods are new to version 13. They support sending 
Fetch requests using topic ID rather
+     *  than topic name. Since the sender and receiver of the fetch request 
may have different topic IDs in
+     *  their caches, there is a possibility for some topic IDs to be 
unresolved on the receiving end. These
+     *  methods and classes try to resolve the topic IDs and keep track of 
unresolved partitions and their errors.
+     */
+
+    // Holds information on partitions whose topic IDs were unable to be 
resolved when the Fetch request
+    // was received.
+    public static final class UnresolvedPartitions {
+        private final Uuid id;
+        private final Map<Integer, PartitionData> partitionData;
+
+        public UnresolvedPartitions(Uuid id, Map<Integer, PartitionData> 
partitionData) {
+            this.id = id;
+            this.partitionData = partitionData;
+        }
+
+        public Uuid id() {

Review comment:
       id => topicId ?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -110,67 +111,164 @@ public String toString() {
         }
     }
 
-    private Optional<Integer> optionalEpoch(int rawEpochValue) {
+    private static Optional<Integer> optionalEpoch(int rawEpochValue) {
         if (rawEpochValue < 0) {
             return Optional.empty();
         } else {
             return Optional.of(rawEpochValue);
         }
     }
 
+    // Only used when version is lower than 13.
     private Map<TopicPartition, PartitionData> 
toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
         Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
         fetchableTopics.forEach(fetchTopic -> 
fetchTopic.partitions().forEach(fetchPartition -> {
             result.put(new TopicPartition(fetchTopic.topic(), 
fetchPartition.partition()),
-                new PartitionData(
-                    fetchPartition.fetchOffset(),
-                    fetchPartition.logStartOffset(),
-                    fetchPartition.partitionMaxBytes(),
-                    optionalEpoch(fetchPartition.currentLeaderEpoch()),
-                    optionalEpoch(fetchPartition.lastFetchedEpoch())
-                ));
+                    new PartitionData(
+                            fetchPartition.fetchOffset(),
+                            fetchPartition.logStartOffset(),
+                            fetchPartition.partitionMaxBytes(),
+                            optionalEpoch(fetchPartition.currentLeaderEpoch()),
+                            optionalEpoch(fetchPartition.lastFetchedEpoch())
+                    ));
         }));
         return Collections.unmodifiableMap(result);
     }
 
-    private List<TopicPartition> 
toForgottenTopicList(List<FetchRequestData.ForgottenTopic> forgottenTopics) {
-        List<TopicPartition> result = new ArrayList<>();
-        forgottenTopics.forEach(forgottenTopic ->
-            forgottenTopic.partitions().forEach(partitionId ->
-                result.add(new TopicPartition(forgottenTopic.topic(), 
partitionId))
-            )
-        );
-        return result;
+    /**
+     *  The following methods are new to version 13. They support sending 
Fetch requests using topic ID rather
+     *  than topic name. Since the sender and receiver of the fetch request 
may have different topic IDs in
+     *  their caches, there is a possibility for some topic IDs to be 
unresolved on the receiving end. These
+     *  methods and classes try to resolve the topic IDs and keep track of 
unresolved partitions and their errors.
+     */
+
+    // Holds information on partitions whose topic IDs were unable to be 
resolved when the Fetch request
+    // was received.
+    public static final class UnresolvedPartitions {
+        private final Uuid id;

Review comment:
       id => topicId ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to