jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r742113471



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -285,52 +268,57 @@ public FetchRequestData build() {
             if (nextMetadata.isFull()) {
                 if (log.isDebugEnabled()) {
                     log.debug("Built full fetch {} for node {} with {}.",
-                              nextMetadata, node, 
partitionsToLogString(next.keySet()));
+                            nextMetadata, node, 
topicPartitionsToLogString(next.keySet()));
                 }
                 sessionPartitions = next;
                 next = null;
+                Map<TopicPartition, PartitionData> toSend =
+                        Collections.unmodifiableMap(new 
LinkedHashMap<>(sessionPartitions));
                 // Only add topic IDs to the session if we are using topic IDs.
                 if (canUseTopicIds) {
-                    sessionTopicIds = topicIds;
-                    sessionTopicNames = new HashMap<>(topicIds.size());
-                    topicIds.forEach((name, id) -> sessionTopicNames.put(id, 
name));
+                    Map<Uuid, Set<String>> newTopicNames = 
sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry
 -> entry.getValue().topicId,
+                            Collectors.mapping(entry -> 
entry.getKey().topic(), Collectors.toSet())));

Review comment:
       The idea was to not do a put operation for every partition but instead 
every topic. Maybe grouping is slower though.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -824,6 +823,14 @@ static boolean 
hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersi
         return 
OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion());
     }
 
+    static boolean hasUsableTopicIdFetchRequestVersion(NodeApiVersions 
nodeApiVersions) {

Review comment:
       Good catch

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String,
                       }
                     }
                   } catch {
-                    case ime@( _: CorruptRecordException | _: 
InvalidRecordException) =>
+                    case ime@(_: CorruptRecordException | _: 
InvalidRecordException) =>

Review comment:
       FETCH_SESSION_TOPIC_ID_ERROR occurs when we switch from not using topic 
IDs in the request to using them (or vice versa). I think maybe we'd want to 
delay partitions to get the latest metadata, but not sure. 

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel,
       None
     }
 
-    val erroneous = mutable.ArrayBuffer[(TopicPartition, 
FetchResponseData.PartitionData)]()
-    val interesting = mutable.ArrayBuffer[(TopicPartition, 
FetchRequest.PartitionData)]()
-    val sessionTopicIds = mutable.Map[String, Uuid]()
+    val erroneous = mutable.ArrayBuffer[(TopicIdPartition, 
FetchResponseData.PartitionData)]()
+    val interesting = mutable.ArrayBuffer[(TopicIdPartition, 
FetchRequest.PartitionData)]()
     if (fetchRequest.isFromFollower) {
       // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
       if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
-        fetchContext.foreachPartition { (topicPartition, topicId, data) =>
-          sessionTopicIds.put(topicPartition.topic(), topicId)
-          if (!metadataCache.contains(topicPartition))
-            erroneous += topicPartition -> 
FetchResponse.partitionResponse(topicPartition.partition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        fetchContext.foreachPartition { (topicIdPartition, data) =>
+          if (topicIdPartition.topicPartition.topic == null )
+            erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)
+          else if (!metadataCache.contains(topicIdPartition.topicPartition))
+            erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
           else
-            interesting += (topicPartition -> data)
+            interesting += (topicIdPartition -> data)
         }
       } else {
-        fetchContext.foreachPartition { (part, topicId, _) =>
-          sessionTopicIds.put(part.topic(), topicId)
-          erroneous += part -> FetchResponse.partitionResponse(part.partition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+        fetchContext.foreachPartition { (topicIdPartition, _) =>
+          erroneous += topicIdPartition -> 
FetchResponse.partitionResponse(topicIdPartition, 
Errors.TOPIC_AUTHORIZATION_FAILED)

Review comment:
       Hmmm. So we'd sort out the ones with null names? What benefit are we 
thinking we'll get from this?
   

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1021,17 +1018,17 @@ class ReplicaManager(val config: KafkaConfig,
     var bytesReadable: Long = 0
     var errorReadingData = false
     var hasDivergingEpoch = false
-    val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
-    logReadResults.foreach { case (topicPartition, logReadResult) =>
-      
brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
+    val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
+    logReadResults.foreach { case (topicIdPartition, logReadResult) =>
+      
brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()

Review comment:
       I think I wrote all of these before the class was updated. but i will 
change them. :)

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -66,16 +69,28 @@ public PartitionData(
             int maxBytes,
             Optional<Integer> currentLeaderEpoch
         ) {
-            this(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, 
Optional.empty());
+            this(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, 
currentLeaderEpoch, Optional.empty());

Review comment:
       Yeah. It's used in 49 places. Some of the places I intentionally left as 
zero uuids. I can convert all of them to Uuid.ZERO_UUID if we think this may be 
bug prone.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -163,18 +173,35 @@ class CachedPartition(val topic: String,
     mustRespond
   }
 
-  override def hashCode: Int = Objects.hash(new TopicPartition(topic, 
partition), topicId)
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   * This means we need a different hash function as well. We use name to 
calculate the hash if the ID is zero and unused.
+   * Otherwise, we use the topic ID in the hash calculation.
+   *
+   * @return the hash code for the CachedPartition depending on what request 
version we are using.
+   */
+  override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) 
+ topicId.hashCode else
+    (31 * partition) + topic.hashCode
 
   def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition]
 
+  /**
+   * We have different equality checks depending on whether topic IDs are used.
+   *
+   * This is because when we use topic IDs, a partition with a given ID and an 
unknown name is the same as a partition with that
+   * ID and a known name. This means we can only use topic ID and partition 
when determining equality.
+   *
+   * On the other hand, if we are using topic names, all IDs are zero. This 
means we can only use topic name and partition
+   * when determining equality.
+   */
   override def equals(that: Any): Boolean =
     that match {
       case that: CachedPartition =>
         this.eq(that) ||
           (that.canEqual(this) &&

Review comment:
       This was here before my change, but I can remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to