lianetm commented on code in PR #16833:
URL: https://github.com/apache/kafka/pull/16833#discussion_r1722456573


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -988,26 +989,29 @@ private void onFailure(final long currentTimeMs,
                                final Errors responseError) {
             log.debug("Offset fetch failed: {}", responseError.message());
             onFailedAttempt(currentTimeMs);
+            ApiException exception = responseError.exception();
             if (responseError == COORDINATOR_LOAD_IN_PROGRESS) {
-                future.completeExceptionally(responseError.exception());
+                future.completeExceptionally(exception);
             } else if (responseError == Errors.UNKNOWN_MEMBER_ID) {
                 log.error("OffsetFetch failed with {} because the member is 
not part of the group" +
                     " anymore.", responseError);
-                future.completeExceptionally(responseError.exception());
+                future.completeExceptionally(exception);
             } else if (responseError == Errors.STALE_MEMBER_EPOCH) {
                 log.error("OffsetFetch failed with {} and the consumer is not 
part " +
                     "of the group anymore (it probably left the group, got 
fenced" +
                     " or failed). The request cannot be retried and will 
fail.", responseError);
-                future.completeExceptionally(responseError.exception());
+                future.completeExceptionally(exception);
             } else if (responseError == Errors.NOT_COORDINATOR || 
responseError == Errors.COORDINATOR_NOT_AVAILABLE) {
                 // Re-discover the coordinator and retry
                 coordinatorRequestManager.markCoordinatorUnknown("error 
response " + responseError.name(), currentTimeMs);
-                future.completeExceptionally(responseError.exception());
+                future.completeExceptionally(exception);
+            } else if (exception instanceof RetriableException && !(exception 
instanceof TimeoutException)) {

Review Comment:
   is there a reason why we don't want to retry the request if it failed with a 
REQUEST_TIMED_OUT error here? I would expect we do retry it (and it's only on 
the `fetchOffsetsWithRetries` that we ensure that we don't keep retrying on the 
retriable errors forever, 
[here](https://github.com/apache/kafka/blob/34475070e144192266e58c4a1e60939f4ba92b95/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L539))
  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to