DL1231 commented on code in PR #19577: URL: https://github.com/apache/kafka/pull/19577#discussion_r2117008511
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -698,15 +699,22 @@ class OffsetCommitRequestState extends RetriableRequestState { } public NetworkClientDelegate.UnsentRequest toUnsentRequest() { + Map<String, Uuid> topicIds = metadata.topicIds(); + boolean canUseTopicIds = true; Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>(); for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { TopicPartition topicPartition = entry.getKey(); OffsetAndMetadata offsetAndMetadata = entry.getValue(); + Uuid topicId = topicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID); + if (topicId.equals(Uuid.ZERO_UUID)) { + canUseTopicIds = false; + } OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap .getOrDefault(topicPartition.topic(), new OffsetCommitRequestData.OffsetCommitRequestTopic() .setName(topicPartition.topic()) + .setTopicId(topicId) Review Comment: The current server handling logic is: when using topic ID, it looks up the topic name based on the topic ID. If no match is found, it returns an exception. Therefore, when using topic ID, have the client pass only the topic ID (without topic name). When the server fails to find a topic name for an obsolete topic ID, it will return an exception. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org