junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r651977071
########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -239,12 +245,22 @@ class FetchSession(val id: Int, // Update the cached partition data based on the request. def update(fetchData: FetchSession.REQ_MAP, toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + reqMetadata: JFetchMetadata, + topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL + val inconsistentTopicIds = new TL fetchData.forEach { (topicPart, reqData) => - val newCachedPart = new CachedPartition(topicPart, reqData) + // Get the topic ID on the broker, if it is valid and the topic is new to the session, add its ID. + // If the topic already existed, check that its ID is consistent. + val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID) + val newCachedPart = new CachedPartition(topicPart, id, reqData) + if (id != Uuid.ZERO_UUID) { + val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id) + if (prevSessionTopicId != null && prevSessionTopicId != id) + inconsistentTopicIds.add(topicPart) Review comment: Got it. Make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org