rajinisivaram commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r562511974
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1190,8 +1192,31 @@ class KafkaApis(val requestChannel: RequestChannel, val metadataRequest = request.body[MetadataRequest] val requestVersion = request.header.apiVersion + // Check if topicId is presented firstly. + val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == Uuid.ZERO_UUID) + val supportedVersionTopicIds = if (config.interBrokerProtocolVersion >= KAFKA_2_8_IV1) topicIds else Set.empty[Uuid] Review comment: Had a chat offline with Justine. Since `KafkaConfig.usesTopicId` is used only to decide whether to generate topic ids, 2_8_IV0 makes sense for that, and the check for KAFKA_2_8_IV1 here is the right one. Justine also mentioned: > if we are in the process of bumping to 2_8_IV1, we could have a scenario where the controller is not yet bumped (and not sending topic IDs) but the broker serving the metadata is. In that case I think we would just get unknown topic ID errors and maybe that is ok. If the reverse is occurring, we could also have unsupported version exception when there are actually IDs. Can we add tests for these two cases? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org