lianetm commented on code in PR #19577:
URL: https://github.com/apache/kafka/pull/19577#discussion_r2445972000


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -732,7 +740,9 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
                 lastEpochSentOnCommit = Optional.empty();
             }
 
-            OffsetCommitRequest.Builder builder = 
OffsetCommitRequest.Builder.forTopicNames(data);
+            OffsetCommitRequest.Builder builder = canUseTopicIds
+                    ? OffsetCommitRequest.Builder.forTopicIdsOrNames(data, 
true)
+                    : OffsetCommitRequest.Builder.forTopicNames(data);

Review Comment:
   If we don't have topic IDs in hand when committing offsets, I'm afraid we 
cannot simply move on and generate a request `forTopicNames` using the last 
stable version. As of v10 (not stable yet but will be eventually), the 
`OffsetCommit` API supports topics ID exclusively, so building with names like 
this would fail, right? 
   
   
https://github.com/apache/kafka/blob/92d9dc19b22529895c430bf2551b551cbfbcaad1/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java#L68-L74
    
   This relates to the comment above: if we don't have topic IDs, we should 
request metadata, but also we cannot continue to generate requests really. So, 
should we just request metadata and return? not generating any request on that 
round, but they will be generated on the next iteration of the background 
thread after the metadata is resolved. Thoughts?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -700,15 +701,22 @@ class OffsetCommitRequestState extends 
RetriableRequestState {
         }
 
         public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
+            Map<String, Uuid> topicIds = metadata.topicIds();
+            boolean canUseTopicIds = true;
             Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> 
requestTopicDataMap = new HashMap<>();
             for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
offsets.entrySet()) {
                 TopicPartition topicPartition = entry.getKey();
                 OffsetAndMetadata offsetAndMetadata = entry.getValue();
+                Uuid topicId = topicIds.getOrDefault(topicPartition.topic(), 
Uuid.ZERO_UUID);
+                if (topicId.equals(Uuid.ZERO_UUID)) {
+                    canUseTopicIds = false;

Review Comment:
   we should request a metadata update in this case I expect. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -752,11 +763,21 @@ public void onResponse(final ClientResponse response) {
             boolean failedRequestRegistered = false;
             for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
                 for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
-                    TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                    // Version 10 drop topic name and support to topic id.
+                    // We need to find offsetAndMetadata based on topic id and 
partition index only as
+                    // topic name in the response will be emtpy.
+                    // For older versions, topic id is zero, and we will find 
the offsetAndMetadata based on the topic name.
+                    TopicPartition tp = 
(!Uuid.ZERO_UUID.equals(topic.topicId()) && 
metadata.topicNames().containsKey(topic.topicId())) ?
+                        new 
TopicPartition(metadata.topicNames().get(topic.topicId()), 
partition.partitionIndex()) :
+                        new TopicPartition(topic.name(), 
partition.partitionIndex());
 
                     Errors error = Errors.forCode(partition.errorCode());
                     if (error == Errors.NONE) {
                         OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+                        if (offsetAndMetadata == null) {
+                            throw new IllegalStateException("Can't find 
metadata for topic id " + topic.topicId() +

Review Comment:
   I'm not convinced that this is what we should do here. At this point, the 
commit request succeeded on the broker. It's just that the response contains a 
topic ID that the client cannot resolve, but it only needs the topic name to 
include it in a `log.debug` really.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to