dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r741789472



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -285,52 +268,57 @@ public FetchRequestData build() {
             if (nextMetadata.isFull()) {
                 if (log.isDebugEnabled()) {
                     log.debug("Built full fetch {} for node {} with {}.",
-                              nextMetadata, node, 
partitionsToLogString(next.keySet()));
+                            nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
                 }
                 sessionPartitions = next;
                 next = null;
+                Map<TopicPartition, PartitionData> toSend =
+                        Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
                 // Only add topic IDs to the session if we are using topic IDs.
                 if (canUseTopicIds) {
-                    sessionTopicIds = topicIds;
-                    sessionTopicNames = new HashMap<>(topicIds.size());
-                    topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
+                    Map<Uuid, Set<String>> newTopicNames = 
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
 -> entry.getValue().topicId,
+                            Collectors.mapping(entry -> 
entry.getKey().topic(), Collectors.toSet())));

Review comment:
       Could we iterate over `sessionPartitions` and directly populate 
`sessionTopicNames` by using `putIfAbsent` or even `put`? The grouping seems 
unnecessary to me here unless I am missing something.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -285,52 +268,57 @@ public FetchRequestData build() {
             if (nextMetadata.isFull()) {
                 if (log.isDebugEnabled()) {
                     log.debug("Built full fetch {} for node {} with {}.",
-                              nextMetadata, node, 
partitionsToLogString(next.keySet()));
+                            nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
                 }
                 sessionPartitions = next;
                 next = null;
+                Map<TopicPartition, PartitionData> toSend =
+                        Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));

Review comment:
       As `toSend` is not used before L288, how about putting this line over 
there?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -346,38 +334,36 @@ public FetchRequestData build() {
                     break;
                 }
                 sessionPartitions.put(topicPartition, nextData);
-                added.add(topicPartition);
+                added.add(new TopicIdPartition(nextData.topicId, 
topicPartition));
             }
 
             // Add topic IDs to session if we can use them. If an ID is 
inconsistent, we will handle in the receiving broker.
             // If we switched from using topic IDs to not using them (or vice 
versa), that error will also be handled in the receiving broker.
             if (canUseTopicIds) {
-                for (Map.Entry<String, Uuid> topic : topicIds.entrySet()) {
-                    String topicName = topic.getKey();
-                    Uuid addedId = topic.getValue();
-                    sessionTopicIds.put(topicName, addedId);
-                    sessionTopicNames.put(addedId, topicName);
-                }
+                Map<Uuid, Set<String>> newTopicNames = 
added.stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
+                        Collectors.mapping(topicIdPartition -> 
topicIdPartition.topicPartition().topic(), Collectors.toSet())));
+
+                // There should only be one topic name per topic ID.
+                newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
             }
 
             if (log.isDebugEnabled()) {
-                log.debug("Built incremental fetch {} for node {}. Added {}, 
altered {}, removed {} " +
-                          "out of {}", nextMetadata, node, 
partitionsToLogString(added),
-                          partitionsToLogString(altered), 
partitionsToLogString(removed),
-                          partitionsToLogString(sessionPartitions.keySet()));
+                log.debug("Built incremental fetch {} for node {}. Added {}, 
altered {}, removed {}, " +
+                                "replaced {} out of {}", nextMetadata, node, 
topicIdPartitionsToLogString(added),

Review comment:
       nit: Could we align like it was before?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -346,38 +334,36 @@ public FetchRequestData build() {
                     break;
                 }
                 sessionPartitions.put(topicPartition, nextData);
-                added.add(topicPartition);
+                added.add(new TopicIdPartition(nextData.topicId, 
topicPartition));
             }
 
             // Add topic IDs to session if we can use them. If an ID is 
inconsistent, we will handle in the receiving broker.
             // If we switched from using topic IDs to not using them (or vice 
versa), that error will also be handled in the receiving broker.
             if (canUseTopicIds) {
-                for (Map.Entry<String, Uuid> topic : topicIds.entrySet()) {
-                    String topicName = topic.getKey();
-                    Uuid addedId = topic.getValue();
-                    sessionTopicIds.put(topicName, addedId);
-                    sessionTopicNames.put(addedId, topicName);
-                }
+                Map<Uuid, Set<String>> newTopicNames = 
added.stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,

Review comment:
       Same comment as before.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchMetadata.java
##########
@@ -120,6 +120,13 @@ public FetchMetadata nextCloseExisting() {
         return new FetchMetadata(sessionId, INITIAL_EPOCH);
     }
 
+    /**
+     * Return the metadata for the next closed session response.
+     */
+    public FetchMetadata closeExisting() {

Review comment:
       It seems that this method is not used anymore. Could we remove it?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -163,18 +173,35 @@ class CachedPartition(val topic: String,
     mustRespond
   }
 
-  override def hashCode: Int = Objects.hash(new TopicPartition(topic, 
partition), topicId)
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   * This means we need a different hash function as well. We use name to 
calculate the hash if the ID is zero and unused.
+   * Otherwise, we use the topic ID in the hash calculation.
+   *
+   * @return the hash code for the CachedPartition depending on what request 
version we are using.
+   */
+  override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) 
+ topicId.hashCode else
+    (31 * partition) + topic.hashCode
 
   def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition]
 
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   *
+   * This is because when we use topic IDs, a partition with a given ID and an 
unknown name is the same as a partition with that
+   * ID and a known name. This means we can only use topic ID and partition 
when determining equality.
+   *
+   * On the other hand, if we are using topic names, all IDs are zero. This 
means we can only use topic name and partition
+   * when determining equality.
+   */
   override def equals(that: Any): Boolean =
     that match {
       case that: CachedPartition =>
         this.eq(that) ||
           (that.canEqual(this) &&

Review comment:
       `that.canEqual(this)` seems weird to me. It seems that we could just 
remove it.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -238,47 +263,40 @@ class FetchSession(val id: Int,
 
   def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
 
-  def getFetchOffset(topicPartition: TopicPartition): Option[Long] = 
synchronized {
-    Option(partitionMap.find(new CachedPartition(topicPartition,
-      sessionTopicIds.getOrDefault(topicPartition.topic(), 
Uuid.ZERO_UUID)))).map(_.fetchOffset)
+  def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] = 
synchronized {
+    Option(partitionMap.find(new 
CachedPartition(topicIdPartition.topicPartition, 
topicIdPartition.topicId))).map(_.fetchOffset)

Review comment:
       nit: We could add another constructor which takes a `TopicIdPartition`.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
       None
     }
 
-    val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-    val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-    val sessionTopicIds = mutable.Map[String, Uuid]()
+    val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+    val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
     if (fetchRequest.isFromFollower) {
       // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
       if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-        fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-          sessionTopicIds.put(topicPartition.topic(), topicId)
-          if (!metadataCache.contains(topicPartition))
-            erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        fetchContext.foreachPartition { (topicIdPartition, data) =>
+          if (topicIdPartition.topicPartition.topic == null )
+            erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)

Review comment:
       nit: There is an extra space after `== null`

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -870,12 +864,15 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         // Prepare fetch response from converted data
         val response =
-          FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, 
unconvertedFetchResponse.sessionId, convertedData, sessionTopicIds.asJava)
+          FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, 
unconvertedFetchResponse.sessionId, convertedData)
         // record the bytes out metrics only when the response is being sent
-        response.data().responses().forEach { topicResponse =>
-          topicResponse.partitions().forEach { data =>
-            val tp = new TopicPartition(topicResponse.topic(), 
data.partitionIndex())
-            brokerTopicStats.updateBytesOut(tp.topic, 
fetchRequest.isFromFollower, reassigningPartitions.contains(tp), 
FetchResponse.recordsSize(data))
+        response.data.responses.forEach { topicResponse =>
+          topicResponse.partitions.forEach { data =>
+            // If the topic name was not known, we will have no bytes out.
+            if (topicResponse.topic != null) {
+              val tp = new TopicIdPartition(topicResponse.topicId, new 
TopicPartition(topicResponse.topic, data.partitionIndex()))

Review comment:
       nit: Parenthesis after `partitionIndex` could be omitted.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3497,14 +3493,13 @@ object KafkaApis {
   // TODO: remove resolvedResponseData method when sizeOf can take a data 
object.
   private[server] def sizeOfThrottledPartitions(versionId: Short,
                                                 unconvertedResponse: 
FetchResponse,
-                                                quota: ReplicationQuotaManager,
-                                                topicIds: util.Map[String, 
Uuid]): Int = {
-    val responseData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+                                                quota: 
ReplicationQuotaManager): Int = {
+    val responseData = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
     unconvertedResponse.data.responses().forEach(topicResponse =>
       topicResponse.partitions().forEach(partition =>
-        responseData.put(new TopicPartition(topicResponse.topic(), 
partition.partitionIndex()), partition)))
+        responseData.put(new TopicIdPartition(topicResponse.topicId, new 
TopicPartition(topicResponse.topic(), partition.partitionIndex())), partition)))

Review comment:
       nit: Parenthesis after partitionIndex could be omitted.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1186,7 +1174,7 @@ class ReplicaManager(val config: KafkaConfig,
             lastStableOffset = None,
             exception = Some(e))
         case e: Throwable =>
-          brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
+          
brokerTopicStats.topicStats(tp.topicPartition.topic).failedFetchRequestRate.mark()

Review comment:
       nit: `tp.topic`

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
       None
     }
 
-    val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-    val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-    val sessionTopicIds = mutable.Map[String, Uuid]()
+    val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+    val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
     if (fetchRequest.isFromFollower) {
       // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
       if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-        fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-          sessionTopicIds.put(topicPartition.topic(), topicId)
-          if (!metadataCache.contains(topicPartition))
-            erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        fetchContext.foreachPartition { (topicIdPartition, data) =>
+          if (topicIdPartition.topicPartition.topic == null )
+            erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
+          else if (!metadataCache.contains(topicIdPartition.topicPartition))
+            erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
           else
-            interesting += (topicPartition -> data)
+            interesting += (topicIdPartition -> data)
         }
       } else {
-        fetchContext.foreachPartition { (part, topicId, _) =>
-          sessionTopicIds.put(part.topic(), topicId)
-          erroneous += part -> FetchResponse.partitionResponse(part.partition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+        fetchContext.foreachPartition { (topicIdPartition, _) =>
+          erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
         }
       }
     } else {
       // Regular Kafka consumers need READ permission on each partition they 
are fetching.
-      val partitionDatas = new mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]
-      fetchContext.foreachPartition { (topicPartition, topicId, partitionData) 
=>
-        partitionDatas += topicPartition -> partitionData
-        sessionTopicIds.put(topicPartition.topic(), topicId)
-      }
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, 
READ, TOPIC, partitionDatas)(_._1.topic)
-      partitionDatas.foreach { case (topicPartition, data) =>
-        if (!authorizedTopics.contains(topicPartition.topic))
-          erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
-        else if (!metadataCache.contains(topicPartition))
-          erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+      val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]
+      fetchContext.foreachPartition { (topicIdPartition, partitionData) =>
+        if (topicIdPartition.topicPartition.topic == null)
+          erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
+        else
+          partitionDatas += topicIdPartition -> partitionData
+      }
+      val authorizedTopics = authHelper.filterByAuthorized(request.context, 
READ, TOPIC, partitionDatas)(_._1.topicPartition.topic)
+      partitionDatas.foreach { case (topicIdPartition, data) =>
+        if (!authorizedTopics.contains(topicIdPartition.topicPartition.topic))
+          erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+        else if (!metadataCache.contains(topicIdPartition.topicPartition))
+          erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
         else
-          interesting += (topicPartition -> data)
+          interesting += (topicIdPartition -> data)

Review comment:
       nit: We can remove the parenthesis here.

##########
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##########
@@ -92,7 +92,7 @@ class DelayedFetch(delayMs: Long,
         val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
         try {
           if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
-            val partition = 
replicaManager.getPartitionOrException(topicPartition)
+            val partition = 
replicaManager.getPartitionOrException(topicPartition.topicPartition)

Review comment:
       Yeah, that would be great. `topicPartition.topicPartition` looks really 
weird while reading.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -163,18 +173,35 @@ class CachedPartition(val topic: String,
     mustRespond
   }
 
-  override def hashCode: Int = Objects.hash(new TopicPartition(topic, 
partition), topicId)
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   * This means we need a different hash function as well. We use name to 
calculate the hash if the ID is zero and unused.
+   * Otherwise, we use the topic ID in the hash calculation.
+   *
+   * @return the hash code for the CachedPartition depending on what request 
version we are using.
+   */
+  override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) 
+ topicId.hashCode else
+    (31 * partition) + topic.hashCode

Review comment:
       nit: Should we format the code as follow?
   
   ```
   override def hashCode: Int = {
     if (topicId != Uuid.ZERO_UUID)
       (31 * partition) + topicId.hashCode
     else
       (31 * partition) + topic.hashCode
   }
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -824,6 +823,14 @@ static boolean 
hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersi
         return 
OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion());
     }
 
+    static boolean hasUsableTopicIdFetchRequestVersion(NodeApiVersions 
nodeApiVersions) {

Review comment:
       Is this method still used? I can't find any usages of it.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -364,10 +405,7 @@ public int maxBytes() {
                         } else {
                             name = topicNames.get(forgottenTopic.topicId());
                         }
-                        if (name == null) {
-                            throw new 
UnknownTopicIdException(String.format("Topic Id %s in FetchRequest was unknown 
to the server", forgottenTopic.topicId()));
-                        }
-                        forgottenTopic.partitions().forEach(partitionId -> 
toForget.add(new TopicPartition(name, partitionId)));
+                        forgottenTopic.partitions().forEach(partitionId -> 
toForget.add(new TopicIdPartition(forgottenTopic.topicId(), new 
TopicPartition(name, partitionId))));

Review comment:
       I would also add a small comment here.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -328,31 +374,26 @@ public int maxBytes() {
                         } else {
                             name = topicNames.get(fetchTopic.topicId());
                         }
-                        if (name != null) {
-                            // If topic name is resolved, simply add to 
fetchData map
-                            fetchTopic.partitions().forEach(fetchPartition ->
-                                    fetchData.put(new TopicPartition(name, 
fetchPartition.partition()),
-                                            new PartitionData(
-                                                    
fetchPartition.fetchOffset(),
-                                                    
fetchPartition.logStartOffset(),
-                                                    
fetchPartition.partitionMaxBytes(),
-                                                    
optionalEpoch(fetchPartition.currentLeaderEpoch()),
-                                                    
optionalEpoch(fetchPartition.lastFetchedEpoch())
-                                            )
-                                    )
-                            );
-                        } else {
-                            throw new 
UnknownTopicIdException(String.format("Topic Id %s in FetchRequest was unknown 
to the server", fetchTopic.topicId()));
-                        }
+                        fetchTopic.partitions().forEach(fetchPartition ->
+                                fetchData.put(new 
TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name, 
fetchPartition.partition())),

Review comment:
       Should we add a comment here which explains that the topic name might be 
null in `TopicIdPartition` if we were unable to resolve it?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -432,9 +425,9 @@ private String 
partitionsToLogString(Collection<TopicPartition> partitions) {
     String verifyFullFetchResponsePartitions(Set<TopicPartition> 
topicPartitions, Set<Uuid> ids, short version) {
         StringBuilder bld = new StringBuilder();
         Set<TopicPartition> extra =
-            findMissing(topicPartitions, sessionPartitions.keySet());
+                findMissing(topicPartitions, sessionPartitions.keySet());

Review comment:
       nit: This change and the following ones do not seem necessary. I would 
revert them back.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
                       }
                     }
                   } catch {
-                    case ime@( _: CorruptRecordException | _: 
InvalidRecordException) =>
+                    case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>

Review comment:
       Putting this here but it is not related to this line.
   
   It seems that we have an opportunity in `processFetchRequest` to better 
handle the `FETCH_SESSION_TOPIC_ID_ERROR` error. At the moment, it delays all 
the partitions. It seems to me that we could retry directly, no? If you agree, 
we could file a Jira and address this in a subsequent PR.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -199,26 +222,49 @@ public FetchRequest build(short version) {
             fetchRequestData.setMaxBytes(maxBytes);
             fetchRequestData.setIsolationLevel(isolationLevel.id());
             fetchRequestData.setForgottenTopicsData(new ArrayList<>());
-            toForget.stream()
-                .collect(Collectors.groupingBy(TopicPartition::topic, 
LinkedHashMap::new, Collectors.toList()))
-                .forEach((topic, partitions) ->
-                    fetchRequestData.forgottenTopicsData().add(new 
FetchRequestData.ForgottenTopic()
-                        .setTopic(topic)
-                        .setTopicId(topicIds.getOrDefault(topic, 
Uuid.ZERO_UUID))
-                        
.setPartitions(partitions.stream().map(TopicPartition::partition).collect(Collectors.toList())))
-                );
-            fetchRequestData.setTopics(new ArrayList<>());
+
+            Map<String, FetchRequestData.ForgottenTopic> forgottenTopicMap = 
new LinkedHashMap<>();
+            removed.forEach(topicIdPartition -> {
+                FetchRequestData.ForgottenTopic forgottenTopic = 
forgottenTopicMap.get(topicIdPartition.topic());
+                if (forgottenTopic == null) {
+                    forgottenTopic = new ForgottenTopic()
+                        .setTopic(topicIdPartition.topic())
+                        .setTopicId(topicIdPartition.topicId());
+                    forgottenTopicMap.put(topicIdPartition.topic(), 
forgottenTopic);
+                }
+                forgottenTopic.partitions().add(topicIdPartition.partition());
+            });
+
+            // If a version older than v13 is used, topic-partition which were 
replaced
+            // by a topic-partition with the same name but a different topic 
ID are not
+            // sent out in the "forget" set in order to not remove the newly 
added
+            // partition in the "fetch" set.
+            if (version >= 13) {
+                replaced.forEach(topicIdPartition -> {
+                    FetchRequestData.ForgottenTopic forgottenTopic = 
forgottenTopicMap.get(topicIdPartition.topic());
+                    if (forgottenTopic == null) {
+                        forgottenTopic = new ForgottenTopic()
+                            .setTopic(topicIdPartition.topic())
+                            .setTopicId(topicIdPartition.topicId());
+                        forgottenTopicMap.put(topicIdPartition.topic(), 
forgottenTopic);
+                    }
+                    
forgottenTopic.partitions().add(topicIdPartition.partition());
+                });

Review comment:
       This block is identical to the previous one. Should we pull it into a 
helper method? (yeah, I know, I wrote this...)

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -285,52 +268,57 @@ public FetchRequestData build() {
             if (nextMetadata.isFull()) {
                 if (log.isDebugEnabled()) {
                     log.debug("Built full fetch {} for node {} with {}.",
-                              nextMetadata, node, 
partitionsToLogString(next.keySet()));
+                            nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
                 }
                 sessionPartitions = next;
                 next = null;
+                Map<TopicPartition, PartitionData> toSend =
+                        Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
                 // Only add topic IDs to the session if we are using topic IDs.
                 if (canUseTopicIds) {
-                    sessionTopicIds = topicIds;
-                    sessionTopicNames = new HashMap<>(topicIds.size());
-                    topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
+                    Map<Uuid, Set<String>> newTopicNames = 
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
 -> entry.getValue().topicId,
+                            Collectors.mapping(entry -> 
entry.getKey().topic(), Collectors.toSet())));
+
+                    sessionTopicNames = new HashMap<>(newTopicNames.size());
+                    // There should only be one topic name per topic ID.
+                    newTopicNames.forEach((topicId, topicNamesSet) -> 
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
                 } else {
-                    sessionTopicIds = new HashMap<>();
                     sessionTopicNames = new HashMap<>();

Review comment:
       Not related to this PR but could we use `Collections.emtpyMap` here? 
That would avoid allocating a `HashMap` all the times.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
       None
     }
 
-    val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-    val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-    val sessionTopicIds = mutable.Map[String, Uuid]()
+    val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+    val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
     if (fetchRequest.isFromFollower) {
       // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
       if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-        fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-          sessionTopicIds.put(topicPartition.topic(), topicId)
-          if (!metadataCache.contains(topicPartition))
-            erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        fetchContext.foreachPartition { (topicIdPartition, data) =>
+          if (topicIdPartition.topicPartition.topic == null )
+            erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
+          else if (!metadataCache.contains(topicIdPartition.topicPartition))
+            erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
           else
-            interesting += (topicPartition -> data)
+            interesting += (topicIdPartition -> data)

Review comment:
       nit: We can remove the parenthesis here.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
       None
     }
 
-    val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-    val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-    val sessionTopicIds = mutable.Map[String, Uuid]()
+    val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+    val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
     if (fetchRequest.isFromFollower) {
       // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
       if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-        fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-          sessionTopicIds.put(topicPartition.topic(), topicId)
-          if (!metadataCache.contains(topicPartition))
-            erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        fetchContext.foreachPartition { (topicIdPartition, data) =>
+          if (topicIdPartition.topicPartition.topic == null )
+            erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
+          else if (!metadataCache.contains(topicIdPartition.topicPartition))
+            erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
           else
-            interesting += (topicPartition -> data)
+            interesting += (topicIdPartition -> data)
         }
       } else {
-        fetchContext.foreachPartition { (part, topicId, _) =>
-          sessionTopicIds.put(part.topic(), topicId)
-          erroneous += part -> FetchResponse.partitionResponse(part.partition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+        fetchContext.foreachPartition { (topicIdPartition, _) =>
+          erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.TOPIC_AUTHORIZATION_FAILED)

Review comment:
       I wonder if we should reply with `UNKNOWN_TOPIC_ID` for the topics whose 
are not resolved.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -163,18 +173,35 @@ class CachedPartition(val topic: String,
     mustRespond
   }
 
-  override def hashCode: Int = Objects.hash(new TopicPartition(topic, 
partition), topicId)
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   * This means we need a different hash function as well. We use name to 
calculate the hash if the ID is zero and unused.
+   * Otherwise, we use the topic ID in the hash calculation.
+   *
+   * @return the hash code for the CachedPartition depending on what request 
version we are using.
+   */
+  override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) 
+ topicId.hashCode else
+    (31 * partition) + topic.hashCode
 
   def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition]
 
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   *
+   * This is because when we use topic IDs, a partition with a given ID and an 
unknown name is the same as a partition with that
+   * ID and a known name. This means we can only use topic ID and partition 
when determining equality.
+   *
+   * On the other hand, if we are using topic names, all IDs are zero. This 
means we can only use topic name and partition
+   * when determining equality.
+   */
   override def equals(that: Any): Boolean =
     that match {
       case that: CachedPartition =>
         this.eq(that) ||
           (that.canEqual(this) &&
             this.partition.equals(that.partition) &&
-            this.topic.equals(that.topic) &&
-            this.topicId.equals(that.topicId))
+            (if (this.topicId != Uuid.ZERO_UUID) 
this.topicId.equals(that.topicId)

Review comment:
       nit: The if/else inline reads a bit weird. Should we extract the if/else?
   
   ```
   this.eq(that) || if (this.topicId != Uuid.ZERO_UUID)
     this.partition.equals(that.partition) && this.topicId.equals(that.topicId)
   else
     this.partition.equals(that.partition) && this.topic.equals(that.topic)
   ```

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -238,47 +263,40 @@ class FetchSession(val id: Int,
 
   def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
 
-  def getFetchOffset(topicPartition: TopicPartition): Option[Long] = 
synchronized {
-    Option(partitionMap.find(new CachedPartition(topicPartition,
-      sessionTopicIds.getOrDefault(topicPartition.topic(), 
Uuid.ZERO_UUID)))).map(_.fetchOffset)
+  def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] = 
synchronized {
+    Option(partitionMap.find(new 
CachedPartition(topicIdPartition.topicPartition, 
topicIdPartition.topicId))).map(_.fetchOffset)
   }
 
-  type TL = util.ArrayList[TopicPartition]
+  type TL = util.ArrayList[TopicIdPartition]
 
   // Update the cached partition data based on the request.
   def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
+             toForget: util.List[TopicIdPartition],
              reqMetadata: JFetchMetadata,
-             topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = 
synchronized {
+             usesTopicIds: Boolean): (TL, TL, TL) = synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
-    val inconsistentTopicIds = new TL
     fetchData.forEach { (topicPart, reqData) =>
-      // Get the topic ID on the broker, if it is valid and the topic is new 
to the session, add its ID.
-      // If the topic already existed, check that its ID is consistent.
-      val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
-      val newCachedPart = new CachedPartition(topicPart, id, reqData)
-      if (id != Uuid.ZERO_UUID) {
-        val prevSessionTopicId = sessionTopicIds.putIfAbsent(topicPart.topic, 
id)
-        if (prevSessionTopicId != null && prevSessionTopicId != id)
-          inconsistentTopicIds.add(topicPart)
-      }
+      val newCachedPart = new CachedPartition(topicPart.topicPartition, 
topicPart.topicId, reqData)
       val cachedPart = partitionMap.find(newCachedPart)
       if (cachedPart == null) {
         partitionMap.mustAdd(newCachedPart)
         added.add(topicPart)
       } else {
         cachedPart.updateRequestParams(reqData)
+        if (cachedPart.topic == null)

Review comment:
       nit: It might be better to encapsulate this in `CachedPartition`. We 
could add a method called `maybeSetTopicName` or piggy back on 
`updateRequestParams`. 

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -801,23 +795,23 @@ class KafkaApis(val requestChannel: RequestChannel,
                 // down-conversion always guarantees that at least one batch 
of messages is down-converted and sent out to the
                 // client.
                 new FetchResponseData.PartitionData()
-                  .setPartitionIndex(tp.partition)
+                  .setPartitionIndex(tp.topicPartition.partition)

Review comment:
       nit: We can use `tp.partition` here and a few other places.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -238,47 +263,40 @@ class FetchSession(val id: Int,
 
   def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
 
-  def getFetchOffset(topicPartition: TopicPartition): Option[Long] = 
synchronized {
-    Option(partitionMap.find(new CachedPartition(topicPartition,
-      sessionTopicIds.getOrDefault(topicPartition.topic(), 
Uuid.ZERO_UUID)))).map(_.fetchOffset)
+  def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] = 
synchronized {
+    Option(partitionMap.find(new 
CachedPartition(topicIdPartition.topicPartition, 
topicIdPartition.topicId))).map(_.fetchOffset)
   }
 
-  type TL = util.ArrayList[TopicPartition]
+  type TL = util.ArrayList[TopicIdPartition]
 
   // Update the cached partition data based on the request.
   def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
+             toForget: util.List[TopicIdPartition],
              reqMetadata: JFetchMetadata,
-             topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = 
synchronized {
+             usesTopicIds: Boolean): (TL, TL, TL) = synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
-    val inconsistentTopicIds = new TL
     fetchData.forEach { (topicPart, reqData) =>
-      // Get the topic ID on the broker, if it is valid and the topic is new 
to the session, add its ID.
-      // If the topic already existed, check that its ID is consistent.
-      val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
-      val newCachedPart = new CachedPartition(topicPart, id, reqData)
-      if (id != Uuid.ZERO_UUID) {
-        val prevSessionTopicId = sessionTopicIds.putIfAbsent(topicPart.topic, 
id)
-        if (prevSessionTopicId != null && prevSessionTopicId != id)
-          inconsistentTopicIds.add(topicPart)
-      }
+      val newCachedPart = new CachedPartition(topicPart.topicPartition, 
topicPart.topicId, reqData)

Review comment:
       nit: How about naming it `cachedPartitionKey`? We could also benefits 
from passing `TopicIdPartition` to the constructor directly. 

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -238,47 +263,40 @@ class FetchSession(val id: Int,
 
   def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
 
-  def getFetchOffset(topicPartition: TopicPartition): Option[Long] = 
synchronized {
-    Option(partitionMap.find(new CachedPartition(topicPartition,
-      sessionTopicIds.getOrDefault(topicPartition.topic(), 
Uuid.ZERO_UUID)))).map(_.fetchOffset)
+  def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] = 
synchronized {
+    Option(partitionMap.find(new 
CachedPartition(topicIdPartition.topicPartition, 
topicIdPartition.topicId))).map(_.fetchOffset)
   }
 
-  type TL = util.ArrayList[TopicPartition]
+  type TL = util.ArrayList[TopicIdPartition]
 
   // Update the cached partition data based on the request.
   def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
+             toForget: util.List[TopicIdPartition],
              reqMetadata: JFetchMetadata,
-             topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = 
synchronized {
+             usesTopicIds: Boolean): (TL, TL, TL) = synchronized {

Review comment:
       Is `usesTopicIds` used anywhere in this method?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -224,10 +224,8 @@ class ReplicaFetcherThread(name: String,
     }
     val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
     if (!fetchSessionHandler.handleResponse(fetchResponse, 
clientResponse.requestHeader().apiVersion())) {
-      // If we had a topic ID related error, throw it, otherwise return an 
empty fetch data map.
-      if (fetchResponse.error == Errors.UNKNOWN_TOPIC_ID ||
-          fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR ||
-          fetchResponse.error == Errors.INCONSISTENT_TOPIC_ID) {
+      // If we had a session topic ID related error, throw it, otherwise 
return an empty fetch data map.
+      if (fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR) {

Review comment:
       I already mentioned this before but it seems that we could retry 
immediately in this case when the session was upgraded/downgraded. That would 
avoid having to wait for the backoff.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1021,17 +1018,17 @@ class ReplicaManager(val config: KafkaConfig,
     var bytesReadable: Long = 0
     var errorReadingData = false
     var hasDivergingEpoch = false
-    val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
-    logReadResults.foreach { case (topicPartition, logReadResult) =>
-      
brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
+    val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
+    logReadResults.foreach { case (topicIdPartition, logReadResult) =>
+      
brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()

Review comment:
       nit: `topicIdPartition.topic` should work.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -66,16 +69,28 @@ public PartitionData(
             int maxBytes,
             Optional<Integer> currentLeaderEpoch
         ) {
-            this(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, 
Optional.empty());
+            this(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, 
currentLeaderEpoch, Optional.empty());

Review comment:
       Do we still use this constructor?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1041,26 +1038,26 @@ class ReplicaManager(val config: KafkaConfig,
     //                        5) we found a diverging epoch
     if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes 
|| errorReadingData || hasDivergingEpoch) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
-        val isReassignmentFetch = isFromFollower && isAddingReplica(tp, 
replicaId)
+        val isReassignmentFetch = isFromFollower && 
isAddingReplica(tp.topicPartition, replicaId)
         tp -> result.toFetchPartitionData(isReassignmentFetch)
       }
       responseCallback(fetchPartitionData)
     } else {
       // construct the fetch results from the read results
-      val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, 
FetchPartitionStatus)]
-      fetchInfos.foreach { case (topicPartition, partitionData) =>
-        logReadResultMap.get(topicPartition).foreach(logReadResult => {
+      val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, 
FetchPartitionStatus)]
+      fetchInfos.foreach { case (topicIdPartition, partitionData) =>
+        logReadResultMap.get(topicIdPartition).foreach(logReadResult => {
           val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
-          fetchPartitionStatus += (topicPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
+          fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
         })
       }
       val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, 
fetchMaxBytes, hardMaxBytesLimit,
-        fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, 
topicIds, fetchPartitionStatus)
+        fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, 
fetchPartitionStatus)
       val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, 
clientMetadata,
         responseCallback)
 
       // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
+      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp.topicPartition) }

Review comment:
       nit: We could add another `apply` method to `TopicPartitionOperationKey` 
which accepts a `TopicIdPartition`. That will be convenient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to