lianetm commented on code in PR #19577: URL: https://github.com/apache/kafka/pull/19577#discussion_r2093194584
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -730,7 +739,10 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { lastEpochSentOnCommit = Optional.empty(); } - OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames(data); + boolean canUseTopicIds = partitionsWithoutTopicIds == 0; Review Comment: is the `partitionsWithoutTopicIds` really needed? wonder if we can simplify, remove it and this, and only keep the `canUseTopicIds` since the beginning (setting it to false whenever we find a missing topic) ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -809,6 +826,9 @@ public void onResponse(final ClientResponse response) { if (!unauthorizedTopics.isEmpty()) { log.error("OffsetCommit failed due to not authorized to commit to topics {}", unauthorizedTopics); future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics)); + } else if (!unknownTopicIds.isEmpty()) { + log.error("OffsetCommit failed due to unknown topic id to commit to topic ids {}", unknownTopicIds); Review Comment: the message reads a bit off, maybe simply `OffsetCommit failed due to unknown topic IDs {}` would do? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -695,15 +696,22 @@ class OffsetCommitRequestState extends RetriableRequestState { } public NetworkClientDelegate.UnsentRequest toUnsentRequest() { + Map<String, Uuid> topicIds = metadata.topicIds(); Review Comment: yes, it is ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -809,6 +826,9 @@ public void onResponse(final ClientResponse response) { if (!unauthorizedTopics.isEmpty()) { log.error("OffsetCommit failed due to not authorized to commit to topics {}", unauthorizedTopics); future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics)); + } else if (!unknownTopicIds.isEmpty()) { + log.error("OffsetCommit failed due to unknown topic id to commit to topic ids {}", unknownTopicIds); + future.completeExceptionally(new UnknownTopicIdException(Errors.UNKNOWN_TOPIC_ID.message())); Review Comment: should we reuse `Errors.UNKNOWN_TOPIC_ID.exception()` instead of creating a new exception here? (we don't want a custom message in this case) ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -469,6 +469,35 @@ public void testCommitSyncFailsWithCommitFailedExceptionOnStaleMemberEpoch() { assertFutureThrows(CommitFailedException.class, commitResult); } + @Test + public void testCommitSyncShouldSuccessWithTopicHasId() { Review Comment: ```suggestion public void testCommitSyncShouldSucceedWithTopicId() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org