TaiJuWu commented on code in PR #21126:
URL: https://github.com/apache/kafka/pull/21126#discussion_r2654578618
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2117,13 +2117,29 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
val topics = offsetForLeaderEpoch.data.topics.asScala.toSeq
+ // Separate topics with unknown topic IDs when using version 5+
+ val (knownTopics, unknownTopicIdTopics) = if
(OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
+ topics.partition { offsetForLeaderTopic =>
+ metadataCache.getTopicName(offsetForLeaderTopic.topicId).isPresent
+ }
+ } else {
+ (topics, Seq.empty[OffsetForLeaderTopic])
+ }
+
// The OffsetsForLeaderEpoch API was initially only used for inter-broker
communication and required
// cluster permission. With KIP-320, the consumer now also uses this API
to check for log truncation
// following a leader change, so we also allow topic describe permission.
val (authorizedTopics, unauthorizedTopics) =
if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER,
CLUSTER_NAME, logIfDenied = false))
- (topics, Seq.empty[OffsetForLeaderTopic])
- else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE,
TOPIC, topics)(_.topic)
+ (knownTopics, Seq.empty[OffsetForLeaderTopic])
+ else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE,
TOPIC, knownTopics) { offsetForLeaderTopic =>
+ // Resolve topic name from topicId if needed for authorization
+ if
(OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
+ metadataCache.getTopicName(offsetForLeaderTopic.topicId).get()
Review Comment:
You are right so I use a addition variable to store such information, and
other comments are addressed, PTAL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]