lianetm commented on code in PR #14557:
URL: https://github.com/apache/kafka/pull/14557#discussion_r1429362412


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1119,8 +1125,11 @@ public void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets, Duration
         acquireAndEnsureOpen();
         long commitStart = time.nanoseconds();
         try {
-            CompletableFuture<Void> commitFuture = commit(offsets, true);
-            ConsumerUtils.getResult(commitFuture, time.timer(timeout));
+            Timer requestTimer = time.timer(timeout.toMillis());
+            // Commit with a timer to control how long the request should be 
retried until it
+            // gets a successful response or non-retriable error.
+            CompletableFuture<Void> commitFuture = commit(offsets, true, 
Optional.of(timeout.toMillis()));
+            ConsumerUtils.getResult(commitFuture, requestTimer);

Review Comment:
   You got it right regarding the timers. And agree that if the case is that 
the request fails, is retried, and the retry response does not arrive in time, 
the request fails with a `TimeoutException` (there is even a 
[test](https://github.com/apache/kafka/blob/97eecdbac186a7b65228f7de665ebe552bb4a377/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java#L471)
 to check that timeout is thrown in that case, seeing it from the angle that 
the response did not arrive in time), but agree that it would make more sense 
to propagate the original retriable exception. 
   
    That could definitely be improved. I already ensure 
[here](https://github.com/apache/kafka/blob/97eecdbac186a7b65228f7de665ebe552bb4a377/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L564)
 that the original exception is propagated when expiring a request before 
retrying it (when a failed response is received), but this improvement would 
ensure that also happens in the case where a response is not received.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to