dajac commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1463282309


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -414,8 +495,69 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
             testNonRetriable(futures);
             assertEmptyPendingRequests(commitRequestManger);
         }
+    }
 
-        assertCoordinatorDisconnect(error);
+    @Test

Review Comment:
   nit: Should we use parameterized tests instead of specifying all cases like 
this?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -272,7 +270,65 @@ public void 
testSyncAutocommitRetriedAfterRetriableException(Errors error) {
 
         // We expect that request should have been retried on this sync commit.
         assertExceptionHandling(commitRequestManger, error, true);
-        assertCoordinatorDisconnect(error);
+    }
+
+    @Test
+    public void testCommitSyncThrowsCommitFailedExceptionOnFencedInstanceId() {
+        
testCommitSyncFailsWithCommitFailedExceptionOnError(Errors.FENCED_INSTANCE_ID);
+    }
+
+    @Test
+    public void testCommitSyncThrowsCommitFailedExceptionOnUnknownMemberId() {
+        
testCommitSyncFailsWithCommitFailedExceptionOnError(Errors.UNKNOWN_MEMBER_ID);
+    }
+
+    private void testCommitSyncFailsWithCommitFailedExceptionOnError(Errors 
commitError) {
+        CommitRequestManager commitRequestManger = create(false, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
+            new TopicPartition("topic", 1),
+            new OffsetAndMetadata(0));
+
+        // Send sync offset commit request that fails with an error that is 
expected to propagate
+        // a CommitFailedException
+        Long expirationTimeMs = time.milliseconds() + retryBackoffMs;
+        CompletableFuture<Void> commitResult = 
commitRequestManger.addOffsetCommitRequest(offsets, 
Optional.of(expirationTimeMs), false);
+        completeOffsetCommitRequestWithError(commitRequestManger, commitError);
+        assertFutureThrows(commitResult, CommitFailedException.class);
+    }
+
+    @Test
+    public void testCommitSyncThrowsOffsetMetadataTooLargeException() {
+        // Error with metadata provided by the user should propagate the 
exception, so they can handle it.
+        
testCommitSyncFailsWithErrorException(Errors.OFFSET_METADATA_TOO_LARGE);
+    }
+
+    @Test
+    public void testCommitSyncThrowsInvalidCommitOffsetSizeException() {
+        // Error with data provided by the user should propagate the 
exception, so they can handle it.
+        
testCommitSyncFailsWithErrorException(Errors.INVALID_COMMIT_OFFSET_SIZE);
+    }
+
+    @Test
+    public void testCommitSyncThrowsGroupAuthorizationException() {
+        
testCommitSyncFailsWithErrorException(Errors.GROUP_AUTHORIZATION_FAILED);
+    }
+
+    private void testCommitSyncFailsWithErrorException(Errors commitError) {

Review Comment:
   This method is very similar to 
testCommitSyncFailsWithCommitFailedExceptionOnError. Could we share part of the 
implementation for both cases? For instance, one idea would be to pass the 
Errors and the exception Exception as params.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -440,28 +582,59 @@ public void 
testOffsetCommitRequestErroredRequestsNotRetriedForAsyncCommit(final
 
         // We expect that the request should not have been retried on this 
async commit.
         assertExceptionHandling(commitRequestManger, error, false);
-        assertCoordinatorDisconnect(error);
     }
 
+    @Test
+    public void 
testCommitAsyncThrowsKafkaExceptionForUnexpectedRetriableError() {
+        testCommitAsyncThrowsKafkaException(Errors.CORRUPT_MESSAGE);
+    }
 
     @Test
-    public void 
testAsyncOffsetCommitThrowsRetriableCommitExceptionForUnhandledRetriable() {
+    public void 
testCommitAsyncThrowsKafkaExceptionForUnexpectedNonRetriableError() {
+        testCommitAsyncThrowsKafkaException(Errors.UNKNOWN_SERVER_ERROR);
+    }
+
+    private void testCommitAsyncThrowsKafkaException(Errors error) {
         CommitRequestManager commitRequestManger = create(true, 100);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
 
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
 
-        // Send commit request without expiration (async commit) that fails 
with retriable
-        // network exception that has no specific handling. Should fail with
-        // RetriableCommitException.
+        // Send async commit that fails with unexpected error. Should fail 
with KafkaException.
         CompletableFuture<Void> commitResult = 
commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false);
-        completeOffsetCommitRequestWithError(commitRequestManger, 
Errors.NETWORK_EXCEPTION);
+        completeOffsetCommitRequestWithError(commitRequestManger, error);
         NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
         assertTrue(commitResult.isDone());
         assertTrue(commitResult.isCompletedExceptionally());
-        assertFutureThrows(commitResult, RetriableCommitFailedException.class);
+        assertFutureThrows(commitResult, KafkaException.class);
+    }
+
+    @Test
+    public void 
testCommitSyncThrowsKafkaExceptionForUnexpectedRetriableError() {
+        testCommitSyncThrowsKafkaException(Errors.CORRUPT_MESSAGE);
+    }
+
+    @Test
+    public void 
testCommitSyncThrowsKafkaExceptionForUnexpectedNonRetriableError() {
+        testCommitSyncThrowsKafkaException(Errors.UNKNOWN_SERVER_ERROR);
+    }
+
+    private void testCommitSyncThrowsKafkaException(Errors error) {

Review Comment:
   This one is also very similar to 
testCommitSyncFailsWithCommitFailedExceptionOnError.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -820,6 +848,8 @@ private void onFailure(final long currentTimeMs,
             } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {

Review Comment:
   That seems to be fine.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) {
                         continue;
                     }
 
-                    if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
+                    if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+                        
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+                        return;
+                    } else if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
                         error == Errors.NOT_COORDINATOR ||
                         error == Errors.REQUEST_TIMED_OUT) {
                         
coordinatorRequestManager.markCoordinatorUnknown(error.message(), 
currentTimeMs);
                         maybeRetry(currentTimeMs, error.exception());
                         return;
+                    } else if (error == Errors.FENCED_INSTANCE_ID) {
+                        log.info("OffsetCommit failed due to group instance id 
{} fenced: {}", groupInstanceId, error.message());
+                        future.completeExceptionally(new 
CommitFailedException());

Review Comment:
   This one has not been addressed yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to