dajac commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1463282309
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -414,8 +495,69 @@ public void testOffsetFetchRequestErroredRequests(final
Errors error, final bool
testNonRetriable(futures);
assertEmptyPendingRequests(commitRequestManger);
}
+ }
- assertCoordinatorDisconnect(error);
+ @Test
Review Comment:
nit: Should we use parameterized tests instead of specifying all cases like
this?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -272,7 +270,65 @@ public void
testSyncAutocommitRetriedAfterRetriableException(Errors error) {
// We expect that request should have been retried on this sync commit.
assertExceptionHandling(commitRequestManger, error, true);
- assertCoordinatorDisconnect(error);
+ }
+
+ @Test
+ public void testCommitSyncThrowsCommitFailedExceptionOnFencedInstanceId() {
+
testCommitSyncFailsWithCommitFailedExceptionOnError(Errors.FENCED_INSTANCE_ID);
+ }
+
+ @Test
+ public void testCommitSyncThrowsCommitFailedExceptionOnUnknownMemberId() {
+
testCommitSyncFailsWithCommitFailedExceptionOnError(Errors.UNKNOWN_MEMBER_ID);
+ }
+
+ private void testCommitSyncFailsWithCommitFailedExceptionOnError(Errors
commitError) {
+ CommitRequestManager commitRequestManger = create(false, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+ Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(
+ new TopicPartition("topic", 1),
+ new OffsetAndMetadata(0));
+
+ // Send sync offset commit request that fails with an error that is
expected to propagate
+ // a CommitFailedException
+ Long expirationTimeMs = time.milliseconds() + retryBackoffMs;
+ CompletableFuture<Void> commitResult =
commitRequestManger.addOffsetCommitRequest(offsets,
Optional.of(expirationTimeMs), false);
+ completeOffsetCommitRequestWithError(commitRequestManger, commitError);
+ assertFutureThrows(commitResult, CommitFailedException.class);
+ }
+
+ @Test
+ public void testCommitSyncThrowsOffsetMetadataTooLargeException() {
+ // Error with metadata provided by the user should propagate the
exception, so they can handle it.
+
testCommitSyncFailsWithErrorException(Errors.OFFSET_METADATA_TOO_LARGE);
+ }
+
+ @Test
+ public void testCommitSyncThrowsInvalidCommitOffsetSizeException() {
+ // Error with data provided by the user should propagate the
exception, so they can handle it.
+
testCommitSyncFailsWithErrorException(Errors.INVALID_COMMIT_OFFSET_SIZE);
+ }
+
+ @Test
+ public void testCommitSyncThrowsGroupAuthorizationException() {
+
testCommitSyncFailsWithErrorException(Errors.GROUP_AUTHORIZATION_FAILED);
+ }
+
+ private void testCommitSyncFailsWithErrorException(Errors commitError) {
Review Comment:
This method is very similar to
testCommitSyncFailsWithCommitFailedExceptionOnError. Could we share part of the
implementation for both cases? For instance, one idea would be to pass the
Errors and the exception Exception as params.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -440,28 +582,59 @@ public void
testOffsetCommitRequestErroredRequestsNotRetriedForAsyncCommit(final
// We expect that the request should not have been retried on this
async commit.
assertExceptionHandling(commitRequestManger, error, false);
- assertCoordinatorDisconnect(error);
}
+ @Test
+ public void
testCommitAsyncThrowsKafkaExceptionForUnexpectedRetriableError() {
+ testCommitAsyncThrowsKafkaException(Errors.CORRUPT_MESSAGE);
+ }
@Test
- public void
testAsyncOffsetCommitThrowsRetriableCommitExceptionForUnhandledRetriable() {
+ public void
testCommitAsyncThrowsKafkaExceptionForUnexpectedNonRetriableError() {
+ testCommitAsyncThrowsKafkaException(Errors.UNKNOWN_SERVER_ERROR);
+ }
+
+ private void testCommitAsyncThrowsKafkaException(Errors error) {
CommitRequestManager commitRequestManger = create(true, 100);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
- // Send commit request without expiration (async commit) that fails
with retriable
- // network exception that has no specific handling. Should fail with
- // RetriableCommitException.
+ // Send async commit that fails with unexpected error. Should fail
with KafkaException.
CompletableFuture<Void> commitResult =
commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false);
- completeOffsetCommitRequestWithError(commitRequestManger,
Errors.NETWORK_EXCEPTION);
+ completeOffsetCommitRequestWithError(commitRequestManger, error);
NetworkClientDelegate.PollResult res =
commitRequestManger.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
assertTrue(commitResult.isDone());
assertTrue(commitResult.isCompletedExceptionally());
- assertFutureThrows(commitResult, RetriableCommitFailedException.class);
+ assertFutureThrows(commitResult, KafkaException.class);
+ }
+
+ @Test
+ public void
testCommitSyncThrowsKafkaExceptionForUnexpectedRetriableError() {
+ testCommitSyncThrowsKafkaException(Errors.CORRUPT_MESSAGE);
+ }
+
+ @Test
+ public void
testCommitSyncThrowsKafkaExceptionForUnexpectedNonRetriableError() {
+ testCommitSyncThrowsKafkaException(Errors.UNKNOWN_SERVER_ERROR);
+ }
+
+ private void testCommitSyncThrowsKafkaException(Errors error) {
Review Comment:
This one is also very similar to
testCommitSyncFailsWithCommitFailedExceptionOnError.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -820,6 +848,8 @@ private void onFailure(final long currentTimeMs,
} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
Review Comment:
That seems to be fine.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) {
continue;
}
- if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
+ if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+ return;
+ } else if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
error == Errors.NOT_COORDINATOR ||
error == Errors.REQUEST_TIMED_OUT) {
coordinatorRequestManager.markCoordinatorUnknown(error.message(),
currentTimeMs);
maybeRetry(currentTimeMs, error.exception());
return;
+ } else if (error == Errors.FENCED_INSTANCE_ID) {
+ log.info("OffsetCommit failed due to group instance id
{} fenced: {}", groupInstanceId, error.message());
+ future.completeExceptionally(new
CommitFailedException());
Review Comment:
This one has not been addressed yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]