junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r617932245
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -662,11 +662,21 @@ class KafkaApis(val requestChannel: RequestChannel, val versionId = request.header.apiVersion val clientId = request.header.clientId val fetchRequest = request.body[FetchRequest] + val (topicIds, topicNames) = + if (fetchRequest.version() >= 13) + metadataCache.topicIdInfo() + else + (Collections.emptyMap[String, Uuid](), Collections.emptyMap[Uuid, String]()) + val fetchContext = fetchManager.newContext( + fetchRequest.version, fetchRequest.metadata, - fetchRequest.fetchData, - fetchRequest.toForget, - fetchRequest.isFromFollower) + fetchRequest.isFromFollower, + fetchRequest.fetchDataAndError(topicNames), + fetchRequest.forgottenTopics(topicNames), + topicNames, + topicIds) + Review comment: extra newline. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -110,67 +111,164 @@ public String toString() { } } - private Optional<Integer> optionalEpoch(int rawEpochValue) { + private static Optional<Integer> optionalEpoch(int rawEpochValue) { if (rawEpochValue < 0) { return Optional.empty(); } else { return Optional.of(rawEpochValue); } } + // Only used when version is lower than 13. private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) { Map<TopicPartition, PartitionData> result = new LinkedHashMap<>(); fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> { result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()), - new PartitionData( - fetchPartition.fetchOffset(), - fetchPartition.logStartOffset(), - fetchPartition.partitionMaxBytes(), - optionalEpoch(fetchPartition.currentLeaderEpoch()), - optionalEpoch(fetchPartition.lastFetchedEpoch()) - )); + new PartitionData( + fetchPartition.fetchOffset(), + fetchPartition.logStartOffset(), + fetchPartition.partitionMaxBytes(), + optionalEpoch(fetchPartition.currentLeaderEpoch()), + optionalEpoch(fetchPartition.lastFetchedEpoch()) + )); })); return Collections.unmodifiableMap(result); } - private List<TopicPartition> toForgottenTopicList(List<FetchRequestData.ForgottenTopic> forgottenTopics) { - List<TopicPartition> result = new ArrayList<>(); - forgottenTopics.forEach(forgottenTopic -> - forgottenTopic.partitions().forEach(partitionId -> - result.add(new TopicPartition(forgottenTopic.topic(), partitionId)) - ) - ); - return result; + /** + * The following methods are new to version 13. They support sending Fetch requests using topic ID rather + * than topic name. Since the sender and receiver of the fetch request may have different topic IDs in + * their caches, there is a possibility for some topic IDs to be unresolved on the receiving end. These + * methods and classes try to resolve the topic IDs and keep track of unresolved partitions and their errors. + */ + + // Holds information on partitions whose topic IDs were unable to be resolved when the Fetch request + // was received. + public static final class UnresolvedPartitions { + private final Uuid id; + private final Map<Integer, PartitionData> partitionData; + + public UnresolvedPartitions(Uuid id, Map<Integer, PartitionData> partitionData) { + this.id = id; + this.partitionData = partitionData; + } + + public Uuid id() { Review comment: id => topicId ? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -110,67 +111,164 @@ public String toString() { } } - private Optional<Integer> optionalEpoch(int rawEpochValue) { + private static Optional<Integer> optionalEpoch(int rawEpochValue) { if (rawEpochValue < 0) { return Optional.empty(); } else { return Optional.of(rawEpochValue); } } + // Only used when version is lower than 13. private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) { Map<TopicPartition, PartitionData> result = new LinkedHashMap<>(); fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> { result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()), - new PartitionData( - fetchPartition.fetchOffset(), - fetchPartition.logStartOffset(), - fetchPartition.partitionMaxBytes(), - optionalEpoch(fetchPartition.currentLeaderEpoch()), - optionalEpoch(fetchPartition.lastFetchedEpoch()) - )); + new PartitionData( + fetchPartition.fetchOffset(), + fetchPartition.logStartOffset(), + fetchPartition.partitionMaxBytes(), + optionalEpoch(fetchPartition.currentLeaderEpoch()), + optionalEpoch(fetchPartition.lastFetchedEpoch()) + )); })); return Collections.unmodifiableMap(result); } - private List<TopicPartition> toForgottenTopicList(List<FetchRequestData.ForgottenTopic> forgottenTopics) { - List<TopicPartition> result = new ArrayList<>(); - forgottenTopics.forEach(forgottenTopic -> - forgottenTopic.partitions().forEach(partitionId -> - result.add(new TopicPartition(forgottenTopic.topic(), partitionId)) - ) - ); - return result; + /** + * The following methods are new to version 13. They support sending Fetch requests using topic ID rather + * than topic name. Since the sender and receiver of the fetch request may have different topic IDs in + * their caches, there is a possibility for some topic IDs to be unresolved on the receiving end. These + * methods and classes try to resolve the topic IDs and keep track of unresolved partitions and their errors. + */ + + // Holds information on partitions whose topic IDs were unable to be resolved when the Fetch request + // was received. + public static final class UnresolvedPartitions { + private final Uuid id; Review comment: id => topicId ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org