rajinisivaram commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r562511974



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1190,8 +1192,31 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.body[MetadataRequest]
     val requestVersion = request.header.apiVersion
 
+    // Check if topicId is presented firstly.
+    val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == 
Uuid.ZERO_UUID)
+    val supportedVersionTopicIds = if (config.interBrokerProtocolVersion >= 
KAFKA_2_8_IV1) topicIds else Set.empty[Uuid]

Review comment:
       Had a chat offline with Justine. Since `KafkaConfig.usesTopicId` is used 
only to decide whether to generate topic ids, 2_8_IV0 makes sense for that, and 
the check for KAFKA_2_8_IV1 here is the right one. Justine also mentioned:
   
   > if we are in the process of bumping to 2_8_IV1, we could have a scenario 
where the controller is not yet bumped (and not sending topic IDs) but the 
broker serving the metadata is. In that case I think we would just get unknown 
topic ID errors and maybe that is ok. If the reverse is occurring, we could 
also have unsupported version exception when there are actually IDs.
   
   
   Can we add tests for these two cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to