dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r741789472
########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -285,52 +268,57 @@ public FetchRequestData build() { if (nextMetadata.isFull()) { if (log.isDebugEnabled()) { log.debug("Built full fetch {} for node {} with {}.", - nextMetadata, node, partitionsToLogString(next.keySet())); + nextMetadata, node, topicPartitionsToLogString(next.keySet())); } sessionPartitions = next; next = null; + Map<TopicPartition, PartitionData> toSend = + Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); // Only add topic IDs to the session if we are using topic IDs. if (canUseTopicIds) { - sessionTopicIds = topicIds; - sessionTopicNames = new HashMap<>(topicIds.size()); - topicIds.forEach((name, id) -> sessionTopicNames.put(id, name)); + Map<Uuid, Set<String>> newTopicNames = sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry -> entry.getValue().topicId, + Collectors.mapping(entry -> entry.getKey().topic(), Collectors.toSet()))); Review comment: Could we iterate over `sessionPartitions` and directly populate `sessionTopicNames` by using `putIfAbsent` or even `put`? The grouping seems unnecessary to me here unless I am missing something. ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -285,52 +268,57 @@ public FetchRequestData build() { if (nextMetadata.isFull()) { if (log.isDebugEnabled()) { log.debug("Built full fetch {} for node {} with {}.", - nextMetadata, node, partitionsToLogString(next.keySet())); + nextMetadata, node, topicPartitionsToLogString(next.keySet())); } sessionPartitions = next; next = null; + Map<TopicPartition, PartitionData> toSend = + Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); Review comment: As `toSend` is not used before L288, how about putting this line over there? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -346,38 +334,36 @@ public FetchRequestData build() { break; } sessionPartitions.put(topicPartition, nextData); - added.add(topicPartition); + added.add(new TopicIdPartition(nextData.topicId, topicPartition)); } // Add topic IDs to session if we can use them. If an ID is inconsistent, we will handle in the receiving broker. // If we switched from using topic IDs to not using them (or vice versa), that error will also be handled in the receiving broker. if (canUseTopicIds) { - for (Map.Entry<String, Uuid> topic : topicIds.entrySet()) { - String topicName = topic.getKey(); - Uuid addedId = topic.getValue(); - sessionTopicIds.put(topicName, addedId); - sessionTopicNames.put(addedId, topicName); - } + Map<Uuid, Set<String>> newTopicNames = added.stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId, + Collectors.mapping(topicIdPartition -> topicIdPartition.topicPartition().topic(), Collectors.toSet()))); + + // There should only be one topic name per topic ID. + newTopicNames.forEach((topicId, topicNamesSet) -> topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName))); } if (log.isDebugEnabled()) { - log.debug("Built incremental fetch {} for node {}. Added {}, altered {}, removed {} " + - "out of {}", nextMetadata, node, partitionsToLogString(added), - partitionsToLogString(altered), partitionsToLogString(removed), - partitionsToLogString(sessionPartitions.keySet())); + log.debug("Built incremental fetch {} for node {}. Added {}, altered {}, removed {}, " + + "replaced {} out of {}", nextMetadata, node, topicIdPartitionsToLogString(added), Review comment: nit: Could we align like it was before? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -346,38 +334,36 @@ public FetchRequestData build() { break; } sessionPartitions.put(topicPartition, nextData); - added.add(topicPartition); + added.add(new TopicIdPartition(nextData.topicId, topicPartition)); } // Add topic IDs to session if we can use them. If an ID is inconsistent, we will handle in the receiving broker. // If we switched from using topic IDs to not using them (or vice versa), that error will also be handled in the receiving broker. if (canUseTopicIds) { - for (Map.Entry<String, Uuid> topic : topicIds.entrySet()) { - String topicName = topic.getKey(); - Uuid addedId = topic.getValue(); - sessionTopicIds.put(topicName, addedId); - sessionTopicNames.put(addedId, topicName); - } + Map<Uuid, Set<String>> newTopicNames = added.stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId, Review comment: Same comment as before. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchMetadata.java ########## @@ -120,6 +120,13 @@ public FetchMetadata nextCloseExisting() { return new FetchMetadata(sessionId, INITIAL_EPOCH); } + /** + * Return the metadata for the next closed session response. + */ + public FetchMetadata closeExisting() { Review comment: It seems that this method is not used anymore. Could we remove it? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -163,18 +173,35 @@ class CachedPartition(val topic: String, mustRespond } - override def hashCode: Int = Objects.hash(new TopicPartition(topic, partition), topicId) + /** + * We have different equality checks depending on whether topic IDs are used. + * This means we need a different hash function as well. We use name to calculate the hash if the ID is zero and unused. + * Otherwise, we use the topic ID in the hash calculation. + * + * @return the hash code for the CachedPartition depending on what request version we are using. + */ + override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) + topicId.hashCode else + (31 * partition) + topic.hashCode def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition] + /** + * We have different equality checks depending on whether topic IDs are used. + * + * This is because when we use topic IDs, a partition with a given ID and an unknown name is the same as a partition with that + * ID and a known name. This means we can only use topic ID and partition when determining equality. + * + * On the other hand, if we are using topic names, all IDs are zero. This means we can only use topic name and partition + * when determining equality. + */ override def equals(that: Any): Boolean = that match { case that: CachedPartition => this.eq(that) || (that.canEqual(this) && Review comment: `that.canEqual(this)` seems weird to me. It seems that we could just remove it. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -238,47 +263,40 @@ class FetchSession(val id: Int, def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) } - def getFetchOffset(topicPartition: TopicPartition): Option[Long] = synchronized { - Option(partitionMap.find(new CachedPartition(topicPartition, - sessionTopicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID)))).map(_.fetchOffset) + def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] = synchronized { + Option(partitionMap.find(new CachedPartition(topicIdPartition.topicPartition, topicIdPartition.topicId))).map(_.fetchOffset) Review comment: nit: We could add another constructor which takes a `TopicIdPartition`. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - val sessionTopicIds = mutable.Map[String, Uuid]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) Review comment: nit: There is an extra space after `== null` ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -870,12 +864,15 @@ class KafkaApis(val requestChannel: RequestChannel, // Prepare fetch response from converted data val response = - FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, unconvertedFetchResponse.sessionId, convertedData, sessionTopicIds.asJava) + FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, unconvertedFetchResponse.sessionId, convertedData) // record the bytes out metrics only when the response is being sent - response.data().responses().forEach { topicResponse => - topicResponse.partitions().forEach { data => - val tp = new TopicPartition(topicResponse.topic(), data.partitionIndex()) - brokerTopicStats.updateBytesOut(tp.topic, fetchRequest.isFromFollower, reassigningPartitions.contains(tp), FetchResponse.recordsSize(data)) + response.data.responses.forEach { topicResponse => + topicResponse.partitions.forEach { data => + // If the topic name was not known, we will have no bytes out. + if (topicResponse.topic != null) { + val tp = new TopicIdPartition(topicResponse.topicId, new TopicPartition(topicResponse.topic, data.partitionIndex())) Review comment: nit: Parenthesis after `partitionIndex` could be omitted. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -3497,14 +3493,13 @@ object KafkaApis { // TODO: remove resolvedResponseData method when sizeOf can take a data object. private[server] def sizeOfThrottledPartitions(versionId: Short, unconvertedResponse: FetchResponse, - quota: ReplicationQuotaManager, - topicIds: util.Map[String, Uuid]): Int = { - val responseData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] + quota: ReplicationQuotaManager): Int = { + val responseData = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] unconvertedResponse.data.responses().forEach(topicResponse => topicResponse.partitions().forEach(partition => - responseData.put(new TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition))) + responseData.put(new TopicIdPartition(topicResponse.topicId, new TopicPartition(topicResponse.topic(), partition.partitionIndex())), partition))) Review comment: nit: Parenthesis after partitionIndex could be omitted. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1186,7 +1174,7 @@ class ReplicaManager(val config: KafkaConfig, lastStableOffset = None, exception = Some(e)) case e: Throwable => - brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark() + brokerTopicStats.topicStats(tp.topicPartition.topic).failedFetchRequestRate.mark() Review comment: nit: `tp.topic` ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - val sessionTopicIds = mutable.Map[String, Uuid]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + else if (!metadataCache.contains(topicIdPartition.topicPartition)) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) else - interesting += (topicPartition -> data) + interesting += (topicIdPartition -> data) } } else { - fetchContext.foreachPartition { (part, topicId, _) => - sessionTopicIds.put(part.topic(), topicId) - erroneous += part -> FetchResponse.partitionResponse(part.partition, Errors.TOPIC_AUTHORIZATION_FAILED) + fetchContext.foreachPartition { (topicIdPartition, _) => + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED) } } } else { // Regular Kafka consumers need READ permission on each partition they are fetching. - val partitionDatas = new mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)] - fetchContext.foreachPartition { (topicPartition, topicId, partitionData) => - partitionDatas += topicPartition -> partitionData - sessionTopicIds.put(topicPartition.topic(), topicId) - } - val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic) - partitionDatas.foreach { case (topicPartition, data) => - if (!authorizedTopics.contains(topicPartition.topic)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.TOPIC_AUTHORIZATION_FAILED) - else if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)] + fetchContext.foreachPartition { (topicIdPartition, partitionData) => + if (topicIdPartition.topicPartition.topic == null) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + else + partitionDatas += topicIdPartition -> partitionData + } + val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topicPartition.topic) + partitionDatas.foreach { case (topicIdPartition, data) => + if (!authorizedTopics.contains(topicIdPartition.topicPartition.topic)) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED) + else if (!metadataCache.contains(topicIdPartition.topicPartition)) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) else - interesting += (topicPartition -> data) + interesting += (topicIdPartition -> data) Review comment: nit: We can remove the parenthesis here. ########## File path: core/src/main/scala/kafka/server/DelayedFetch.scala ########## @@ -92,7 +92,7 @@ class DelayedFetch(delayMs: Long, val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch try { if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { - val partition = replicaManager.getPartitionOrException(topicPartition) + val partition = replicaManager.getPartitionOrException(topicPartition.topicPartition) Review comment: Yeah, that would be great. `topicPartition.topicPartition` looks really weird while reading. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -163,18 +173,35 @@ class CachedPartition(val topic: String, mustRespond } - override def hashCode: Int = Objects.hash(new TopicPartition(topic, partition), topicId) + /** + * We have different equality checks depending on whether topic IDs are used. + * This means we need a different hash function as well. We use name to calculate the hash if the ID is zero and unused. + * Otherwise, we use the topic ID in the hash calculation. + * + * @return the hash code for the CachedPartition depending on what request version we are using. + */ + override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) + topicId.hashCode else + (31 * partition) + topic.hashCode Review comment: nit: Should we format the code as follow? ``` override def hashCode: Int = { if (topicId != Uuid.ZERO_UUID) (31 * partition) + topicId.hashCode else (31 * partition) + topic.hashCode } ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -824,6 +823,14 @@ static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersi return OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion()); } + static boolean hasUsableTopicIdFetchRequestVersion(NodeApiVersions nodeApiVersions) { Review comment: Is this method still used? I can't find any usages of it. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -364,10 +405,7 @@ public int maxBytes() { } else { name = topicNames.get(forgottenTopic.topicId()); } - if (name == null) { - throw new UnknownTopicIdException(String.format("Topic Id %s in FetchRequest was unknown to the server", forgottenTopic.topicId())); - } - forgottenTopic.partitions().forEach(partitionId -> toForget.add(new TopicPartition(name, partitionId))); + forgottenTopic.partitions().forEach(partitionId -> toForget.add(new TopicIdPartition(forgottenTopic.topicId(), new TopicPartition(name, partitionId)))); Review comment: I would also add a small comment here. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -328,31 +374,26 @@ public int maxBytes() { } else { name = topicNames.get(fetchTopic.topicId()); } - if (name != null) { - // If topic name is resolved, simply add to fetchData map - fetchTopic.partitions().forEach(fetchPartition -> - fetchData.put(new TopicPartition(name, fetchPartition.partition()), - new PartitionData( - fetchPartition.fetchOffset(), - fetchPartition.logStartOffset(), - fetchPartition.partitionMaxBytes(), - optionalEpoch(fetchPartition.currentLeaderEpoch()), - optionalEpoch(fetchPartition.lastFetchedEpoch()) - ) - ) - ); - } else { - throw new UnknownTopicIdException(String.format("Topic Id %s in FetchRequest was unknown to the server", fetchTopic.topicId())); - } + fetchTopic.partitions().forEach(fetchPartition -> + fetchData.put(new TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name, fetchPartition.partition())), Review comment: Should we add a comment here which explains that the topic name might be null in `TopicIdPartition` if we were unable to resolve it? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -432,9 +425,9 @@ private String partitionsToLogString(Collection<TopicPartition> partitions) { String verifyFullFetchResponsePartitions(Set<TopicPartition> topicPartitions, Set<Uuid> ids, short version) { StringBuilder bld = new StringBuilder(); Set<TopicPartition> extra = - findMissing(topicPartitions, sessionPartitions.keySet()); + findMissing(topicPartitions, sessionPartitions.keySet()); Review comment: nit: This change and the following ones do not seem necessary. I would revert them back. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String, } } } catch { - case ime@( _: CorruptRecordException | _: InvalidRecordException) => + case ime@(_: CorruptRecordException | _: InvalidRecordException) => Review comment: Putting this here but it is not related to this line. It seems that we have an opportunity in `processFetchRequest` to better handle the `FETCH_SESSION_TOPIC_ID_ERROR` error. At the moment, it delays all the partitions. It seems to me that we could retry directly, no? If you agree, we could file a Jira and address this in a subsequent PR. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -199,26 +222,49 @@ public FetchRequest build(short version) { fetchRequestData.setMaxBytes(maxBytes); fetchRequestData.setIsolationLevel(isolationLevel.id()); fetchRequestData.setForgottenTopicsData(new ArrayList<>()); - toForget.stream() - .collect(Collectors.groupingBy(TopicPartition::topic, LinkedHashMap::new, Collectors.toList())) - .forEach((topic, partitions) -> - fetchRequestData.forgottenTopicsData().add(new FetchRequestData.ForgottenTopic() - .setTopic(topic) - .setTopicId(topicIds.getOrDefault(topic, Uuid.ZERO_UUID)) - .setPartitions(partitions.stream().map(TopicPartition::partition).collect(Collectors.toList()))) - ); - fetchRequestData.setTopics(new ArrayList<>()); + + Map<String, FetchRequestData.ForgottenTopic> forgottenTopicMap = new LinkedHashMap<>(); + removed.forEach(topicIdPartition -> { + FetchRequestData.ForgottenTopic forgottenTopic = forgottenTopicMap.get(topicIdPartition.topic()); + if (forgottenTopic == null) { + forgottenTopic = new ForgottenTopic() + .setTopic(topicIdPartition.topic()) + .setTopicId(topicIdPartition.topicId()); + forgottenTopicMap.put(topicIdPartition.topic(), forgottenTopic); + } + forgottenTopic.partitions().add(topicIdPartition.partition()); + }); + + // If a version older than v13 is used, topic-partition which were replaced + // by a topic-partition with the same name but a different topic ID are not + // sent out in the "forget" set in order to not remove the newly added + // partition in the "fetch" set. + if (version >= 13) { + replaced.forEach(topicIdPartition -> { + FetchRequestData.ForgottenTopic forgottenTopic = forgottenTopicMap.get(topicIdPartition.topic()); + if (forgottenTopic == null) { + forgottenTopic = new ForgottenTopic() + .setTopic(topicIdPartition.topic()) + .setTopicId(topicIdPartition.topicId()); + forgottenTopicMap.put(topicIdPartition.topic(), forgottenTopic); + } + forgottenTopic.partitions().add(topicIdPartition.partition()); + }); Review comment: This block is identical to the previous one. Should we pull it into a helper method? (yeah, I know, I wrote this...) ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -285,52 +268,57 @@ public FetchRequestData build() { if (nextMetadata.isFull()) { if (log.isDebugEnabled()) { log.debug("Built full fetch {} for node {} with {}.", - nextMetadata, node, partitionsToLogString(next.keySet())); + nextMetadata, node, topicPartitionsToLogString(next.keySet())); } sessionPartitions = next; next = null; + Map<TopicPartition, PartitionData> toSend = + Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); // Only add topic IDs to the session if we are using topic IDs. if (canUseTopicIds) { - sessionTopicIds = topicIds; - sessionTopicNames = new HashMap<>(topicIds.size()); - topicIds.forEach((name, id) -> sessionTopicNames.put(id, name)); + Map<Uuid, Set<String>> newTopicNames = sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry -> entry.getValue().topicId, + Collectors.mapping(entry -> entry.getKey().topic(), Collectors.toSet()))); + + sessionTopicNames = new HashMap<>(newTopicNames.size()); + // There should only be one topic name per topic ID. + newTopicNames.forEach((topicId, topicNamesSet) -> topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName))); } else { - sessionTopicIds = new HashMap<>(); sessionTopicNames = new HashMap<>(); Review comment: Not related to this PR but could we use `Collections.emtpyMap` here? That would avoid allocating a `HashMap` all the times. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - val sessionTopicIds = mutable.Map[String, Uuid]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + else if (!metadataCache.contains(topicIdPartition.topicPartition)) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) else - interesting += (topicPartition -> data) + interesting += (topicIdPartition -> data) Review comment: nit: We can remove the parenthesis here. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - val sessionTopicIds = mutable.Map[String, Uuid]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + else if (!metadataCache.contains(topicIdPartition.topicPartition)) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) else - interesting += (topicPartition -> data) + interesting += (topicIdPartition -> data) } } else { - fetchContext.foreachPartition { (part, topicId, _) => - sessionTopicIds.put(part.topic(), topicId) - erroneous += part -> FetchResponse.partitionResponse(part.partition, Errors.TOPIC_AUTHORIZATION_FAILED) + fetchContext.foreachPartition { (topicIdPartition, _) => + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED) Review comment: I wonder if we should reply with `UNKNOWN_TOPIC_ID` for the topics whose are not resolved. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -163,18 +173,35 @@ class CachedPartition(val topic: String, mustRespond } - override def hashCode: Int = Objects.hash(new TopicPartition(topic, partition), topicId) + /** + * We have different equality checks depending on whether topic IDs are used. + * This means we need a different hash function as well. We use name to calculate the hash if the ID is zero and unused. + * Otherwise, we use the topic ID in the hash calculation. + * + * @return the hash code for the CachedPartition depending on what request version we are using. + */ + override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) + topicId.hashCode else + (31 * partition) + topic.hashCode def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition] + /** + * We have different equality checks depending on whether topic IDs are used. + * + * This is because when we use topic IDs, a partition with a given ID and an unknown name is the same as a partition with that + * ID and a known name. This means we can only use topic ID and partition when determining equality. + * + * On the other hand, if we are using topic names, all IDs are zero. This means we can only use topic name and partition + * when determining equality. + */ override def equals(that: Any): Boolean = that match { case that: CachedPartition => this.eq(that) || (that.canEqual(this) && this.partition.equals(that.partition) && - this.topic.equals(that.topic) && - this.topicId.equals(that.topicId)) + (if (this.topicId != Uuid.ZERO_UUID) this.topicId.equals(that.topicId) Review comment: nit: The if/else inline reads a bit weird. Should we extract the if/else? ``` this.eq(that) || if (this.topicId != Uuid.ZERO_UUID) this.partition.equals(that.partition) && this.topicId.equals(that.topicId) else this.partition.equals(that.partition) && this.topic.equals(that.topic) ``` ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -238,47 +263,40 @@ class FetchSession(val id: Int, def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) } - def getFetchOffset(topicPartition: TopicPartition): Option[Long] = synchronized { - Option(partitionMap.find(new CachedPartition(topicPartition, - sessionTopicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID)))).map(_.fetchOffset) + def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] = synchronized { + Option(partitionMap.find(new CachedPartition(topicIdPartition.topicPartition, topicIdPartition.topicId))).map(_.fetchOffset) } - type TL = util.ArrayList[TopicPartition] + type TL = util.ArrayList[TopicIdPartition] // Update the cached partition data based on the request. def update(fetchData: FetchSession.REQ_MAP, - toForget: util.List[TopicPartition], + toForget: util.List[TopicIdPartition], reqMetadata: JFetchMetadata, - topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = synchronized { + usesTopicIds: Boolean): (TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL - val inconsistentTopicIds = new TL fetchData.forEach { (topicPart, reqData) => - // Get the topic ID on the broker, if it is valid and the topic is new to the session, add its ID. - // If the topic already existed, check that its ID is consistent. - val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID) - val newCachedPart = new CachedPartition(topicPart, id, reqData) - if (id != Uuid.ZERO_UUID) { - val prevSessionTopicId = sessionTopicIds.putIfAbsent(topicPart.topic, id) - if (prevSessionTopicId != null && prevSessionTopicId != id) - inconsistentTopicIds.add(topicPart) - } + val newCachedPart = new CachedPartition(topicPart.topicPartition, topicPart.topicId, reqData) val cachedPart = partitionMap.find(newCachedPart) if (cachedPart == null) { partitionMap.mustAdd(newCachedPart) added.add(topicPart) } else { cachedPart.updateRequestParams(reqData) + if (cachedPart.topic == null) Review comment: nit: It might be better to encapsulate this in `CachedPartition`. We could add a method called `maybeSetTopicName` or piggy back on `updateRequestParams`. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -801,23 +795,23 @@ class KafkaApis(val requestChannel: RequestChannel, // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the // client. new FetchResponseData.PartitionData() - .setPartitionIndex(tp.partition) + .setPartitionIndex(tp.topicPartition.partition) Review comment: nit: We can use `tp.partition` here and a few other places. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -238,47 +263,40 @@ class FetchSession(val id: Int, def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) } - def getFetchOffset(topicPartition: TopicPartition): Option[Long] = synchronized { - Option(partitionMap.find(new CachedPartition(topicPartition, - sessionTopicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID)))).map(_.fetchOffset) + def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] = synchronized { + Option(partitionMap.find(new CachedPartition(topicIdPartition.topicPartition, topicIdPartition.topicId))).map(_.fetchOffset) } - type TL = util.ArrayList[TopicPartition] + type TL = util.ArrayList[TopicIdPartition] // Update the cached partition data based on the request. def update(fetchData: FetchSession.REQ_MAP, - toForget: util.List[TopicPartition], + toForget: util.List[TopicIdPartition], reqMetadata: JFetchMetadata, - topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = synchronized { + usesTopicIds: Boolean): (TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL - val inconsistentTopicIds = new TL fetchData.forEach { (topicPart, reqData) => - // Get the topic ID on the broker, if it is valid and the topic is new to the session, add its ID. - // If the topic already existed, check that its ID is consistent. - val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID) - val newCachedPart = new CachedPartition(topicPart, id, reqData) - if (id != Uuid.ZERO_UUID) { - val prevSessionTopicId = sessionTopicIds.putIfAbsent(topicPart.topic, id) - if (prevSessionTopicId != null && prevSessionTopicId != id) - inconsistentTopicIds.add(topicPart) - } + val newCachedPart = new CachedPartition(topicPart.topicPartition, topicPart.topicId, reqData) Review comment: nit: How about naming it `cachedPartitionKey`? We could also benefits from passing `TopicIdPartition` to the constructor directly. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -238,47 +263,40 @@ class FetchSession(val id: Int, def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) } - def getFetchOffset(topicPartition: TopicPartition): Option[Long] = synchronized { - Option(partitionMap.find(new CachedPartition(topicPartition, - sessionTopicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID)))).map(_.fetchOffset) + def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] = synchronized { + Option(partitionMap.find(new CachedPartition(topicIdPartition.topicPartition, topicIdPartition.topicId))).map(_.fetchOffset) } - type TL = util.ArrayList[TopicPartition] + type TL = util.ArrayList[TopicIdPartition] // Update the cached partition data based on the request. def update(fetchData: FetchSession.REQ_MAP, - toForget: util.List[TopicPartition], + toForget: util.List[TopicIdPartition], reqMetadata: JFetchMetadata, - topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = synchronized { + usesTopicIds: Boolean): (TL, TL, TL) = synchronized { Review comment: Is `usesTopicIds` used anywhere in this method? ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -224,10 +224,8 @@ class ReplicaFetcherThread(name: String, } val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse] if (!fetchSessionHandler.handleResponse(fetchResponse, clientResponse.requestHeader().apiVersion())) { - // If we had a topic ID related error, throw it, otherwise return an empty fetch data map. - if (fetchResponse.error == Errors.UNKNOWN_TOPIC_ID || - fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR || - fetchResponse.error == Errors.INCONSISTENT_TOPIC_ID) { + // If we had a session topic ID related error, throw it, otherwise return an empty fetch data map. + if (fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR) { Review comment: I already mentioned this before but it seems that we could retry immediately in this case when the session was upgraded/downgraded. That would avoid having to wait for the backoff. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1021,17 +1018,17 @@ class ReplicaManager(val config: KafkaConfig, var bytesReadable: Long = 0 var errorReadingData = false var hasDivergingEpoch = false - val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult] - logReadResults.foreach { case (topicPartition, logReadResult) => - brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark() + val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult] + logReadResults.foreach { case (topicIdPartition, logReadResult) => + brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark() Review comment: nit: `topicIdPartition.topic` should work. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -66,16 +69,28 @@ public PartitionData( int maxBytes, Optional<Integer> currentLeaderEpoch ) { - this(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, Optional.empty()); + this(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, Optional.empty()); Review comment: Do we still use this constructor? ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1041,26 +1038,26 @@ class ReplicaManager(val config: KafkaConfig, // 5) we found a diverging epoch if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) { val fetchPartitionData = logReadResults.map { case (tp, result) => - val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId) + val isReassignmentFetch = isFromFollower && isAddingReplica(tp.topicPartition, replicaId) tp -> result.toFetchPartitionData(isReassignmentFetch) } responseCallback(fetchPartitionData) } else { // construct the fetch results from the read results - val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)] - fetchInfos.foreach { case (topicPartition, partitionData) => - logReadResultMap.get(topicPartition).foreach(logReadResult => { + val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionStatus)] + fetchInfos.foreach { case (topicIdPartition, partitionData) => + logReadResultMap.get(topicIdPartition).foreach(logReadResult => { val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata - fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) + fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, - fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, topicIds, fetchPartitionStatus) + fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } + val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp.topicPartition) } Review comment: nit: We could add another `apply` method to `TopicPartitionOperationKey` which accepts a `TopicIdPartition`. That will be convenient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org