lianetm commented on code in PR #21464:
URL: https://github.com/apache/kafka/pull/21464#discussion_r2800656267
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -552,24 +557,99 @@ private void fetchOffsetsWithRetries(final
OffsetFetchRequestState fetchRequest,
log.warn("A duplicated, inflight, request was identified, but
unable to find it in the " +
"outbound buffer: {}", fetchRequest);
}
- if (error == null) {
- maybeUpdateLastSeenEpochIfNewer(res);
- result.complete(res);
- } else {
- if (error instanceof RetriableException ||
isStaleEpochErrorAndValidEpochAvailable(error)) {
- if (fetchRequest.isExpired()) {
- log.debug("OffsetFetch request for {} timed out and
won't be retried anymore", fetchRequest.requestedPartitions);
-
result.completeExceptionally(maybeWrapAsTimeoutException(error));
- } else {
- fetchRequest.resetFuture();
- fetchOffsetsWithRetries(fetchRequest, result);
- }
- } else
- result.completeExceptionally(error);
+
+ // Success: no group-level error and no partition-level errors
+ if (error == null && !res.hasRetriablePartitionErrors()) {
+ handleSuccessfulOffsetFetch(result, res);
+ return;
+ }
+
+ // Group-level error
+ if (error != null) {
+ handleGroupLevelError(fetchRequest, result, error);
+ return;
}
+
+ // Partition-level errors
+ handleRetriablePartitionErrors(fetchRequest, result, res);
});
}
+ /**
+ * Handles a successful offset fetch response with no errors.
+ */
+ private void handleSuccessfulOffsetFetch(final
CompletableFuture<OffsetFetchResult> result,
+ final OffsetFetchResult res) {
+ maybeUpdateLastSeenEpochIfNewer(res.offsets());
+ result.complete(res);
+ }
+
+ /**
+ * Handles group-level errors from an offset fetch request.
+ * Group-level errors indicate the entire request failed (e.g.,
coordinator unavailable).
+ */
+ private void handleGroupLevelError(final OffsetFetchRequestState
fetchRequest,
+ final
CompletableFuture<OffsetFetchResult> result,
+ final Throwable error) {
+ boolean isRetriable = (error instanceof RetriableException) ||
+ isStaleEpochErrorAndValidEpochAvailable(error);
+
+ if (!isRetriable) {
+ result.completeExceptionally(error);
+ return;
+ }
+
+ if (fetchRequest.isExpired()) {
+ log.debug("OffsetFetch request for {} timed out and won't be
retried anymore",
+ fetchRequest.requestedPartitions);
+ result.completeExceptionally(maybeWrapAsTimeoutException(error));
+ return;
+ }
+
+ retryOffsetFetchOnError(fetchRequest, result, "retriable error: " +
error.getMessage());
+ }
+
+ /**
+ * Handles retriable partition-level errors from an offset fetch response.
+ *
+ * <p>The only retriable partition errors are UNKNOWN_TOPIC_ID and
UNKNOWN_TOPIC_OR_PARTITION.
+ * When expired or not enough time for another retry, we return partial
results with null for
+ * the errored partitions. We check against {@code remainingBackoffMs} to
ensure we complete
+ * before the {@link
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper}
+ * expires the event with TimeoutException.
+ */
+ private void handleRetriablePartitionErrors(final OffsetFetchRequestState
fetchRequest,
Review Comment:
this is the functional difference introduced with this PR: retry on
retriable partition-level errors (UNKNOWN_TOPIC_ID and
UNKNOWN_TOPIC_OR_PARTITION for now), return whatever results we have if there
is no more time to retry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]