chickenchickenlove commented on code in PR #21126:
URL: https://github.com/apache/kafka/pull/21126#discussion_r2652475533


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java:
##########
@@ -70,6 +84,24 @@ public OffsetsForLeaderEpochRequest build(short version) {
             if (version < oldestAllowedVersion() || version > 
latestAllowedVersion())
                 throw new UnsupportedVersionException("Cannot build " + this + 
" with version " + version);
 
+            if (version <= 4) {
+                for (OffsetForLeaderEpochRequestData.OffsetForLeaderTopic 
topic : data.topics()) {
+                    if (topic.topic() == null || topic.topic().isEmpty()) {
+                        throw new UnsupportedVersionException("The broker 
offsets for leader api version " +
+                                version + " does require usage of topic 
names.");
+                    }
+                }
+            }
+
+            if (version >= 5) {
+                for (OffsetForLeaderEpochRequestData.OffsetForLeaderTopic 
topic : data.topics()) {
+                    if (topic.topicId() == null || topic.topicId() == 
Uuid.ZERO_UUID) {

Review Comment:
   The parsed UUID object in the API response might not share the same 
reference as ZERO_UUID. I wonder if comparing them using `equals()` instead of 
`==` was intended here. What do you think?
   For example, 
   ```java
   if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) { ... 
}
   ```



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2117,13 +2117,29 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
     val topics = offsetForLeaderEpoch.data.topics.asScala.toSeq
 
+    // Separate topics with unknown topic IDs when using version 5+
+    val (knownTopics, unknownTopicIdTopics) = if 
(OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
+      topics.partition { offsetForLeaderTopic =>
+        metadataCache.getTopicName(offsetForLeaderTopic.topicId).isPresent
+      }
+    } else {
+      (topics, Seq.empty[OffsetForLeaderTopic])
+    }
+
     // The OffsetsForLeaderEpoch API was initially only used for inter-broker 
communication and required
     // cluster permission. With KIP-320, the consumer now also uses this API 
to check for log truncation
     // following a leader change, so we also allow topic describe permission.
     val (authorizedTopics, unauthorizedTopics) =
       if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME, logIfDenied = false))
-        (topics, Seq.empty[OffsetForLeaderTopic])
-      else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, 
TOPIC, topics)(_.topic)
+        (knownTopics, Seq.empty[OffsetForLeaderTopic])
+      else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, 
TOPIC, knownTopics) { offsetForLeaderTopic =>
+        // Resolve topic name from topicId if needed for authorization
+        if 
(OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
+          metadataCache.getTopicName(offsetForLeaderTopic.topicId).get()

Review Comment:
   Should we consider race condition in terms of view update here?
   Because we check for existence first and retrieve the value later.
   
   Consider the following sequence:
   1. Thread A: Executes the partitioning logic. 
`metadataCache.getTopicName(id).isPresent()` returns true for UUID-123, so it's 
added to knownTopics.
   2. Thread B: Handles a metadata update (e.g., topic deletion via Admin API). 
It removes UUID-123 from `metadataCache`.
   3. Thread A: Proceeds to the authorization block and calls 
`metadataCache.getTopicName(id).get()`.
   
   As a result of this operation, `NoSuchElementException` will be thrown 
because the topic no longer exists in the cache, leading to an uncaught 
exception.



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java:
##########
@@ -51,10 +52,23 @@ public static Builder 
forConsumer(OffsetForLeaderTopicCollection epochsByPartiti
             // to clients. Beginning with version 3, the broker requires only 
TOPIC Describe
             // permission for the topic of each requested partition. In order 
to ensure client
             // compatibility, we only send this request when we can guarantee 
the relaxed permissions.
+
+            // Check if all topics have topic IDs. If so, we can use version 5 
which requires topic IDs.
+            // Otherwise, use version 4 which uses topic names.
+            boolean canUseTopicId = true;
+            for (OffsetForLeaderEpochRequestData.OffsetForLeaderTopic topic : 
epochsByPartition) {
+                if (topic.topicId() == null || 
topic.topicId().equals(Uuid.ZERO_UUID)) {
+                    canUseTopicId = false;
+                    break;
+                }
+            }
+
             OffsetForLeaderEpochRequestData data = new 
OffsetForLeaderEpochRequestData();
             data.setReplicaId(CONSUMER_REPLICA_ID);
             data.setTopics(epochsByPartition);
-            return new Builder((short) 3, 
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data);
+
+            short latestVersion = canUseTopicId ? (short) 5 : (short) 4;
+            return new Builder((short) 3, latestVersion, data);

Review Comment:
   `ApiKeys.OFFSET_FOR_LEADER_EPOCH` could be updated to treat `v5` as the 
unstable latest version. That way, we can reference the version registered in 
the enum instead of hard-coding magic numbers like 4/5 here. 
   For example, we can use
   ``` java
   if (canUseTopicId) {
     return new Builder((short) 3, 
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data);
   } else {
     return new Builder((short) 3, 
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(true), data);
   }
   ```
   
   
   If bumping `latestVersion` is too impactful, would it make sense to follow 
any existing pattern for retrieving the unstable latest version and use that 
instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to