lianetm commented on code in PR #21464:
URL: https://github.com/apache/kafka/pull/21464#discussion_r2800656267


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -552,24 +557,99 @@ private void fetchOffsetsWithRetries(final 
OffsetFetchRequestState fetchRequest,
                 log.warn("A duplicated, inflight, request was identified, but 
unable to find it in the " +
                     "outbound buffer: {}", fetchRequest);
             }
-            if (error == null) {
-                maybeUpdateLastSeenEpochIfNewer(res);
-                result.complete(res);
-            } else {
-                if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
-                    if (fetchRequest.isExpired()) {
-                        log.debug("OffsetFetch request for {} timed out and 
won't be retried anymore", fetchRequest.requestedPartitions);
-                        
result.completeExceptionally(maybeWrapAsTimeoutException(error));
-                    } else {
-                        fetchRequest.resetFuture();
-                        fetchOffsetsWithRetries(fetchRequest, result);
-                    }
-                } else
-                    result.completeExceptionally(error);
+
+            // Success: no group-level error and no partition-level errors
+            if (error == null && !res.hasRetriablePartitionErrors()) {
+                handleSuccessfulOffsetFetch(result, res);
+                return;
+            }
+
+            // Group-level error
+            if (error != null) {
+                handleGroupLevelError(fetchRequest, result, error);
+                return;
             }
+
+            // Partition-level errors
+            handleRetriablePartitionErrors(fetchRequest, result, res);
         });
     }
 
+    /**
+     * Handles a successful offset fetch response with no errors.
+     */
+    private void handleSuccessfulOffsetFetch(final 
CompletableFuture<OffsetFetchResult> result,
+                                             final OffsetFetchResult res) {
+        maybeUpdateLastSeenEpochIfNewer(res.offsets());
+        result.complete(res);
+    }
+
+    /**
+     * Handles group-level errors from an offset fetch request.
+     * Group-level errors indicate the entire request failed (e.g., 
coordinator unavailable).
+     */
+    private void handleGroupLevelError(final OffsetFetchRequestState 
fetchRequest,
+                                       final 
CompletableFuture<OffsetFetchResult> result,
+                                       final Throwable error) {
+        boolean isRetriable = (error instanceof RetriableException) ||
+            isStaleEpochErrorAndValidEpochAvailable(error);
+
+        if (!isRetriable) {
+            result.completeExceptionally(error);
+            return;
+        }
+
+        if (fetchRequest.isExpired()) {
+            log.debug("OffsetFetch request for {} timed out and won't be 
retried anymore",
+                fetchRequest.requestedPartitions);
+            result.completeExceptionally(maybeWrapAsTimeoutException(error));
+            return;
+        }
+
+        retryOffsetFetchOnError(fetchRequest, result, "retriable error: " + 
error.getMessage());
+    }
+
+    /**
+     * Handles retriable partition-level errors from an offset fetch response.
+     *
+     * <p>The only retriable partition errors are UNKNOWN_TOPIC_ID and 
UNKNOWN_TOPIC_OR_PARTITION.
+     * When expired or not enough time for another retry, we return partial 
results with null for
+     * the errored partitions. We check against {@code remainingBackoffMs} to 
ensure we complete
+     * before the {@link 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper}
+     * expires the event with TimeoutException.
+     */
+    private void handleRetriablePartitionErrors(final OffsetFetchRequestState 
fetchRequest,

Review Comment:
   this is the functional difference introduced with this PR: retry on 
retriable partition-level errors (UNKNOWN_TOPIC_ID and 
UNKNOWN_TOPIC_OR_PARTITION for now), return whatever results we have if there 
is no more time to retry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to