jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r719649068



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -870,12 +868,14 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         // Prepare fetch response from converted data
         val response =
-          FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, 
unconvertedFetchResponse.sessionId, convertedData, sessionTopicIds.asJava)
+          FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, 
unconvertedFetchResponse.sessionId, convertedData)
         // record the bytes out metrics only when the response is being sent
         response.data().responses().forEach { topicResponse =>
           topicResponse.partitions().forEach { data =>
-            val tp = new TopicPartition(topicResponse.topic(), 
data.partitionIndex())
-            brokerTopicStats.updateBytesOut(tp.topic, 
fetchRequest.isFromFollower, reassigningPartitions.contains(tp), 
FetchResponse.recordsSize(data))
+            val tp = new TopicIdPartition(topicResponse.topicId, new 
TopicPartition(topicResponse.topic(), data.partitionIndex()))
+            // If the topic name was not known, we will have no bytes out.
+            if (tp.topicPartition.topic != null)

Review comment:
       Reassigning partitions takes a topic ID partition unfortunately. But I 
suppose we can change that. Not sure if we want to distinguish between 
reassigning partitions if we had two with the same name in the session.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to