jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r742113471
########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -285,52 +268,57 @@ public FetchRequestData build() { if (nextMetadata.isFull()) { if (log.isDebugEnabled()) { log.debug("Built full fetch {} for node {} with {}.", - nextMetadata, node, partitionsToLogString(next.keySet())); + nextMetadata, node, topicPartitionsToLogString(next.keySet())); } sessionPartitions = next; next = null; + Map<TopicPartition, PartitionData> toSend = + Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); // Only add topic IDs to the session if we are using topic IDs. if (canUseTopicIds) { - sessionTopicIds = topicIds; - sessionTopicNames = new HashMap<>(topicIds.size()); - topicIds.forEach((name, id) -> sessionTopicNames.put(id, name)); + Map<Uuid, Set<String>> newTopicNames = sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry -> entry.getValue().topicId, + Collectors.mapping(entry -> entry.getKey().topic(), Collectors.toSet()))); Review comment: The idea was to not do a put operation for every partition but instead every topic. Maybe grouping is slower though. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -824,6 +823,14 @@ static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersi return OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion()); } + static boolean hasUsableTopicIdFetchRequestVersion(NodeApiVersions nodeApiVersions) { Review comment: Good catch ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String, } } } catch { - case ime@( _: CorruptRecordException | _: InvalidRecordException) => + case ime@(_: CorruptRecordException | _: InvalidRecordException) => Review comment: FETCH_SESSION_TOPIC_ID_ERROR occurs when we switch from not using topic IDs in the request to using them (or vice versa). I think maybe we'd want to delay partitions to get the latest metadata, but not sure. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - val sessionTopicIds = mutable.Map[String, Uuid]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + else if (!metadataCache.contains(topicIdPartition.topicPartition)) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) else - interesting += (topicPartition -> data) + interesting += (topicIdPartition -> data) } } else { - fetchContext.foreachPartition { (part, topicId, _) => - sessionTopicIds.put(part.topic(), topicId) - erroneous += part -> FetchResponse.partitionResponse(part.partition, Errors.TOPIC_AUTHORIZATION_FAILED) + fetchContext.foreachPartition { (topicIdPartition, _) => + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED) Review comment: Hmmm. So we'd sort out the ones with null names? What benefit are we thinking we'll get from this? ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1021,17 +1018,17 @@ class ReplicaManager(val config: KafkaConfig, var bytesReadable: Long = 0 var errorReadingData = false var hasDivergingEpoch = false - val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult] - logReadResults.foreach { case (topicPartition, logReadResult) => - brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark() + val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult] + logReadResults.foreach { case (topicIdPartition, logReadResult) => + brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark() Review comment: I think I wrote all of these before the class was updated. but i will change them. :) ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -66,16 +69,28 @@ public PartitionData( int maxBytes, Optional<Integer> currentLeaderEpoch ) { - this(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, Optional.empty()); + this(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, Optional.empty()); Review comment: Yeah. It's used in 49 places. Some of the places I intentionally left as zero uuids. I can convert all of them to Uuid.ZERO_UUID if we think this may be bug prone. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -163,18 +173,35 @@ class CachedPartition(val topic: String, mustRespond } - override def hashCode: Int = Objects.hash(new TopicPartition(topic, partition), topicId) + /** + * We have different equality checks depending on whether topic IDs are used. + * This means we need a different hash function as well. We use name to calculate the hash if the ID is zero and unused. + * Otherwise, we use the topic ID in the hash calculation. + * + * @return the hash code for the CachedPartition depending on what request version we are using. + */ + override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) + topicId.hashCode else + (31 * partition) + topic.hashCode def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition] + /** + * We have different equality checks depending on whether topic IDs are used. + * + * This is because when we use topic IDs, a partition with a given ID and an unknown name is the same as a partition with that + * ID and a known name. This means we can only use topic ID and partition when determining equality. + * + * On the other hand, if we are using topic names, all IDs are zero. This means we can only use topic name and partition + * when determining equality. + */ override def equals(that: Any): Boolean = that match { case that: CachedPartition => this.eq(that) || (that.canEqual(this) && Review comment: This was here before my change, but I can remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org