jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r719649068
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -870,12 +868,14 @@ class KafkaApis(val requestChannel: RequestChannel, // Prepare fetch response from converted data val response = - FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, unconvertedFetchResponse.sessionId, convertedData, sessionTopicIds.asJava) + FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, unconvertedFetchResponse.sessionId, convertedData) // record the bytes out metrics only when the response is being sent response.data().responses().forEach { topicResponse => topicResponse.partitions().forEach { data => - val tp = new TopicPartition(topicResponse.topic(), data.partitionIndex()) - brokerTopicStats.updateBytesOut(tp.topic, fetchRequest.isFromFollower, reassigningPartitions.contains(tp), FetchResponse.recordsSize(data)) + val tp = new TopicIdPartition(topicResponse.topicId, new TopicPartition(topicResponse.topic(), data.partitionIndex())) + // If the topic name was not known, we will have no bytes out. + if (tp.topicPartition.topic != null) Review comment: Reassigning partitions takes a topic ID partition unfortunately. But I suppose we can change that. Not sure if we want to distinguish between reassigning partitions if we had two with the same name in the session. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org