jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743949497
########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -93,27 +93,42 @@ class CachedPartition(val topic: String, def this(topic: String, partition: Int, topicId: Uuid) = this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, Optional.empty[Integer]) - def this(part: TopicPartition, topicId: Uuid) = - this(part.topic, part.partition, topicId) + def this(part: TopicIdPartition) = { + this(part.topic, part.partition, part.topicId) + } - def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData) = - this(part.topic, id, part.partition, reqData.maxBytes, reqData.fetchOffset, -1, + def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) = + this(part.topic, part.topicId, part.partition, reqData.maxBytes, reqData.fetchOffset, -1, reqData.currentLeaderEpoch, reqData.logStartOffset, -1, reqData.lastFetchedEpoch) - def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData, + def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData, respData: FetchResponseData.PartitionData) = - this(part.topic, id, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark, + this(part.topic, part.topicId, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark, reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset, reqData.lastFetchedEpoch) - def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) + def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) - def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = { + def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, name: String): Unit = { // Update our cached request parameters. maxBytes = reqData.maxBytes fetchOffset = reqData.fetchOffset fetcherLogStartOffset = reqData.logStartOffset leaderEpoch = reqData.currentLeaderEpoch lastFetchedEpoch = reqData.lastFetchedEpoch + // Update name if needed + maybeSetUnknownName(name) + } + + def maybeSetUnknownName(name: String): Unit = { + if (this.topic == null) { + this.topic = name + } + } + + def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = { Review comment: Ok, so we'll pass a name and the reqData in that method. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -794,22 +793,23 @@ class ReplicaManagerTest { // We receive one valid request from the follower and replica state is updated var successfulFetch: Option[FetchPartitionData] = None - def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit = { - successfulFetch = response.headOption.filter { case (topicPartition, _) => topicPartition == tp }.map { case (_, data) => data } + def callback(response: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + // Check the topic partition only since we are reusing this callback on different TopicIdPartitions. + successfulFetch = response.headOption.filter { case (topicIdPartition, _) => topicIdPartition.topicPartition == tidp.topicPartition }.map { case (_, data) => data } Review comment: So I can write a separate callback for each one that checks the ID. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org