lianetm commented on code in PR #19577:
URL: https://github.com/apache/kafka/pull/19577#discussion_r2093194584


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -730,7 +739,10 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
                 lastEpochSentOnCommit = Optional.empty();
             }
 
-            OffsetCommitRequest.Builder builder = 
OffsetCommitRequest.Builder.forTopicNames(data);
+            boolean canUseTopicIds = partitionsWithoutTopicIds == 0;

Review Comment:
   is the `partitionsWithoutTopicIds` really needed? wonder if we can simplify, 
remove it and this, and only keep the `canUseTopicIds` since the beginning 
(setting it to false whenever we find a missing topic) 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -809,6 +826,9 @@ public void onResponse(final ClientResponse response) {
             if (!unauthorizedTopics.isEmpty()) {
                 log.error("OffsetCommit failed due to not authorized to commit 
to topics {}", unauthorizedTopics);
                 future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else if (!unknownTopicIds.isEmpty()) {
+                log.error("OffsetCommit failed due to unknown topic id to 
commit to topic ids {}", unknownTopicIds);

Review Comment:
   the message reads a bit off, maybe simply `OffsetCommit failed due to 
unknown topic IDs {}` would do?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -695,15 +696,22 @@ class OffsetCommitRequestState extends 
RetriableRequestState {
         }
 
         public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
+            Map<String, Uuid> topicIds = metadata.topicIds();

Review Comment:
   yes, it is



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -809,6 +826,9 @@ public void onResponse(final ClientResponse response) {
             if (!unauthorizedTopics.isEmpty()) {
                 log.error("OffsetCommit failed due to not authorized to commit 
to topics {}", unauthorizedTopics);
                 future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else if (!unknownTopicIds.isEmpty()) {
+                log.error("OffsetCommit failed due to unknown topic id to 
commit to topic ids {}", unknownTopicIds);
+                future.completeExceptionally(new 
UnknownTopicIdException(Errors.UNKNOWN_TOPIC_ID.message()));

Review Comment:
   should we reuse `Errors.UNKNOWN_TOPIC_ID.exception()` instead of creating a 
new exception here? (we don't want a custom message in this case)



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -469,6 +469,35 @@ public void 
testCommitSyncFailsWithCommitFailedExceptionOnStaleMemberEpoch() {
         assertFutureThrows(CommitFailedException.class, commitResult);
     }
 
+    @Test
+    public void testCommitSyncShouldSuccessWithTopicHasId() {

Review Comment:
   ```suggestion
       public void testCommitSyncShouldSucceedWithTopicId() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to