lianetm commented on code in PR #14557:
URL: https://github.com/apache/kafka/pull/14557#discussion_r1426857957


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -457,120 +539,224 @@ public void onResponse(final ClientResponse response) {
             }
         }
 
-        private void handleRetriableError(Errors error, ClientResponse 
response) {
-            if (error == COORDINATOR_NOT_AVAILABLE ||
-                error == NOT_COORDINATOR ||
-                error == REQUEST_TIMED_OUT) {
-                
coordinatorRequestManager.markCoordinatorUnknown(error.message(), 
response.receivedTimeMs());
+        /**
+         * Enqueue the request to be retried with exponential backoff. This 
will fail the request
+         * without retrying if the request timer expired.
+         */
+        @Override
+        void retry(long currentTimeMs, Throwable throwable) {
+            if (!expirationTimeMs.isPresent() || isExpired(currentTimeMs)) {
+                // Fail requests that had no expiration time (async requests), 
or that had it, and
+                // it expired (sync requests).
+                future.completeExceptionally(throwable);
+                return;
             }
-        }
 
-        private void retry(final long currentTimeMs) {
+            // Enqueue request to be retried with backoff. Note that this 
maintains the same
+            // timer of the initial request, so all the retries are 
time-bounded.
             onFailedAttempt(currentTimeMs);
             pendingRequests.addOffsetCommitRequest(this);
         }
 
-        private void handleFatalError(final Errors error) {
-            switch (error) {
-                case GROUP_AUTHORIZATION_FAILED:
-                    
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
-                    break;
-                case OFFSET_METADATA_TOO_LARGE:
-                case INVALID_COMMIT_OFFSET_SIZE:
-                    future.completeExceptionally(error.exception());
-                    break;
-                case FENCED_INSTANCE_ID:
-                    log.info("OffsetCommit failed due to group instance id {} 
fenced: {}", groupInstanceId, error.message());
-                    future.completeExceptionally(new CommitFailedException());
-                    break;
-                case UNKNOWN_MEMBER_ID:
-                    log.info("OffsetCommit failed due to unknown member id 
memberId {}: {}", null, error.message());
-                    future.completeExceptionally(error.exception());
-                    break;
-                default:
-                    future.completeExceptionally(new 
KafkaException("Unexpected error in commit: " + error.message()));
-                    break;
+        private boolean isExpired(final long currentTimeMs) {
+            return expirationTimeMs.isPresent() && expirationTimeMs.get() <= 
currentTimeMs;
+        }
+
+        private void expire() {
+            future.completeExceptionally(new TimeoutException("OffsetCommit 
could not complete " +
+                "before timeout expired."));
+        }
+    }
+
+    /**
+     * Represents a request that can be retried or aborted, based on member ID 
and epoch
+     * information.
+     */
+    abstract static class RetriableRequestState extends RequestState {
+
+        /**
+         * Member info (ID and epoch) to be included in the request if present.
+         */
+        final MemberInfo memberInfo;
+
+        RetriableRequestState(LogContext logContext, String owner, long 
retryBackoffMs,
+                              long retryBackoffMaxMs, MemberInfo memberInfo) {
+            super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+            this.memberInfo = memberInfo;
+        }
+
+        // Visible for testing
+        RetriableRequestState(LogContext logContext, String owner, long 
retryBackoffMs, int retryBackoffExpBase,
+                              long retryBackoffMaxMs, double jitter, 
MemberInfo memberInfo) {
+            super(logContext, owner, retryBackoffMs, retryBackoffExpBase, 
retryBackoffMaxMs, jitter);
+            this.memberInfo = memberInfo;
+        }
+
+        /**
+         * Retry with backoff if the request failed with {@link 
Errors#UNKNOWN_MEMBER_ID} or
+         * {@link Errors#STALE_MEMBER_EPOCH} and the member has valid member 
ID and epoch.
+         *
+         * @return True if the request has been enqueued to be retried with 
the latest member ID
+         * and epoch.
+         */
+        boolean maybeRetryOnGroupError(long currentTimeMs, Errors 
responseError) {
+            if (responseError == Errors.STALE_MEMBER_EPOCH || responseError == 
Errors.UNKNOWN_MEMBER_ID) {
+                if (memberInfo.memberEpoch.isPresent()) {
+                    // Request failed with invalid ID/epoch, but the member 
has a valid one, so
+                    // retry the request with the latest ID/epoch.
+                    retry(currentTimeMs, responseError.exception());
+                    return true;
+                }
             }
+            return false;
         }
+
+        abstract void retry(long currentTimeMs, Throwable throwable);
     }
 
-    class OffsetFetchRequestState extends RequestState {
+    class OffsetFetchRequestState extends RetriableRequestState {
+
+        /**
+         * Partitions to get committed offsets for.
+         */
         public final Set<TopicPartition> requestedPartitions;
-        public final GroupState.Generation requestedGeneration;
+
         private final CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> future;
+
+        /**
+         * Time until which the request should be retried if it fails with 
retriable errors.
+         */
+        private final long expirationTimeMs;
+
         public OffsetFetchRequestState(final Set<TopicPartition> partitions,
-                                       final GroupState.Generation generation,
                                        final long retryBackoffMs,
-                                       final long retryBackoffMaxMs) {
-            super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs, retryBackoffMaxMs);
+                                       final long retryBackoffMaxMs,
+                                       final long expirationTimeMs,
+                                       final MemberInfo memberInfo) {
+            super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs,
+                retryBackoffMaxMs, memberInfo);
             this.requestedPartitions = partitions;
-            this.requestedGeneration = generation;
             this.future = new CompletableFuture<>();
+            this.expirationTimeMs = expirationTimeMs;
         }
 
         public OffsetFetchRequestState(final Set<TopicPartition> partitions,
-                                       final GroupState.Generation generation,
                                        final long retryBackoffMs,
                                        final long retryBackoffMaxMs,
-                                       final double jitter) {
-            super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs, 2, retryBackoffMaxMs, jitter);
+                                       final long expirationTimeMs,
+                                       final double jitter,
+                                       final MemberInfo memberInfo) {
+            super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs, 2,
+                retryBackoffMaxMs, jitter, memberInfo);
             this.requestedPartitions = partitions;
-            this.requestedGeneration = generation;
             this.future = new CompletableFuture<>();
+            this.expirationTimeMs = expirationTimeMs;
         }
 
         public boolean sameRequest(final OffsetFetchRequestState request) {
-            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+            return requestedPartitions.equals(request.requestedPartitions);
         }
 
         public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
-            OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
-                    groupState.groupId,
-                    true,
-                    new ArrayList<>(this.requestedPartitions),
-                    throwOnFetchStableOffsetUnsupported);
+
+            OffsetFetchRequest.Builder builder;
+            if (memberInfo.memberId.isPresent() && 
memberInfo.memberEpoch.isPresent()) {
+                builder = new OffsetFetchRequest.Builder(
+                        groupId,
+                        memberInfo.memberId.get(),
+                        memberInfo.memberEpoch.get(),
+                        true,
+                        new ArrayList<>(this.requestedPartitions),
+                        throwOnFetchStableOffsetUnsupported);
+            } else {
+                // Building request without passing member ID/epoch to leave 
the logic to choose
+                // default values when not present on the request builder.
+                builder = new OffsetFetchRequest.Builder(
+                        groupId,
+                        true,
+                        new ArrayList<>(this.requestedPartitions),
+                        throwOnFetchStableOffsetUnsupported);
+            }
             return new NetworkClientDelegate.UnsentRequest(builder, 
coordinatorRequestManager.coordinator())
-                .whenComplete((r, t) -> onResponse(r.receivedTimeMs(), 
(OffsetFetchResponse) r.responseBody()));
+                    .whenComplete((r, t) -> onResponse(r.receivedTimeMs(), 
(OffsetFetchResponse) r.responseBody()));
         }
 
+        /**
+         * Handle request responses, including successful and failed.
+         */
         public void onResponse(
-            final long currentTimeMs,
-            final OffsetFetchResponse response) {
-            Errors responseError = 
response.groupLevelError(groupState.groupId);
+                final long currentTimeMs,
+                final OffsetFetchResponse response) {
+            Errors responseError = response.groupLevelError(groupId);
             if (responseError != Errors.NONE) {
                 onFailure(currentTimeMs, responseError);
                 return;
             }
             onSuccess(currentTimeMs, response);
         }
 
+        /**
+         * Handle failed responses. This will retry if the error is retriable, 
or complete the
+         * result future exceptionally in the case of non-recoverable or 
unexpected errors.
+         */
         private void onFailure(final long currentTimeMs,
                                final Errors responseError) {
             handleCoordinatorDisconnect(responseError.exception(), 
currentTimeMs);
             log.debug("Offset fetch failed: {}", responseError.message());
             if (responseError == COORDINATOR_LOAD_IN_PROGRESS) {
-                retry(currentTimeMs);
+                retry(currentTimeMs, responseError.exception());
+            } else if (responseError == Errors.STALE_MEMBER_EPOCH || 
responseError == Errors.UNKNOWN_MEMBER_ID) {
+                boolean retried = maybeRetryOnGroupError(currentTimeMs, 
responseError);
+                if (!retried) {
+                    log.error("OffsetFetch failed with {} and the consumer is 
not part " +
+                        "of the group anymore (it probably left the group, got 
fenced" +
+                        " or failed). The request cannot be retried and will 
fail.", responseError);
+                    future.completeExceptionally(responseError.exception());
+                } else {
+                    log.debug("OffsetFetch failed with {} but the consumer is 
still part" +
+                        " of the group, so the request will be retried with 
the latest " +
+                        "member ID and epoch.", responseError);
+                }
             } else if (responseError == Errors.NOT_COORDINATOR) {
                 // re-discover the coordinator and retry
                 coordinatorRequestManager.markCoordinatorUnknown("error 
response " + responseError.name(), currentTimeMs);

Review Comment:
   Totally agree that it's inconsistent, and I truly don't know why it's being 
handled differently in the legacy coordinator, or the implications that 
changing it would have. So should we keep it like this here just to ensure we 
handle it in the same way as the legacy coordinator does, and review it 
separately to better understand and then improve it? If agree I can file a jira 
for it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to