lianetm commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1463498713
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -440,28 +582,59 @@ public void
testOffsetCommitRequestErroredRequestsNotRetriedForAsyncCommit(final
// We expect that the request should not have been retried on this
async commit.
assertExceptionHandling(commitRequestManger, error, false);
- assertCoordinatorDisconnect(error);
}
+ @Test
+ public void
testCommitAsyncThrowsKafkaExceptionForUnexpectedRetriableError() {
+ testCommitAsyncThrowsKafkaException(Errors.CORRUPT_MESSAGE);
+ }
@Test
- public void
testAsyncOffsetCommitThrowsRetriableCommitExceptionForUnhandledRetriable() {
+ public void
testCommitAsyncThrowsKafkaExceptionForUnexpectedNonRetriableError() {
+ testCommitAsyncThrowsKafkaException(Errors.UNKNOWN_SERVER_ERROR);
+ }
+
+ private void testCommitAsyncThrowsKafkaException(Errors error) {
CommitRequestManager commitRequestManger = create(true, 100);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
- // Send commit request without expiration (async commit) that fails
with retriable
- // network exception that has no specific handling. Should fail with
- // RetriableCommitException.
+ // Send async commit that fails with unexpected error. Should fail
with KafkaException.
CompletableFuture<Void> commitResult =
commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false);
- completeOffsetCommitRequestWithError(commitRequestManger,
Errors.NETWORK_EXCEPTION);
+ completeOffsetCommitRequestWithError(commitRequestManger, error);
NetworkClientDelegate.PollResult res =
commitRequestManger.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
assertTrue(commitResult.isDone());
assertTrue(commitResult.isCompletedExceptionally());
- assertFutureThrows(commitResult, RetriableCommitFailedException.class);
+ assertFutureThrows(commitResult, KafkaException.class);
+ }
+
+ @Test
+ public void
testCommitSyncThrowsKafkaExceptionForUnexpectedRetriableError() {
+ testCommitSyncThrowsKafkaException(Errors.CORRUPT_MESSAGE);
+ }
+
+ @Test
+ public void
testCommitSyncThrowsKafkaExceptionForUnexpectedNonRetriableError() {
+ testCommitSyncThrowsKafkaException(Errors.UNKNOWN_SERVER_ERROR);
+ }
+
+ private void testCommitSyncThrowsKafkaException(Errors error) {
Review Comment:
Agree, it does not exist anymore after the refactoring for the tests common
logic and params
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]