kirktrue commented on code in PR #14557:
URL: https://github.com/apache/kafka/pull/14557#discussion_r1406678397


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -161,16 +172,16 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
         List<NetworkClientDelegate.UnsentRequest> requests = 
pendingRequests.drain(currentTimeMs);
         // min of the remainingBackoffMs of all the request that are still 
backing off
         final long timeUntilNextPoll = Math.min(
-            findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
-            findMinTime(unsentOffsetFetchRequests(), currentTimeMs));
+                findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
+                findMinTime(unsentOffsetFetchRequests(), currentTimeMs));

Review Comment:
   Nit: we can leave the indentation as is.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * Listener to register for getting notified when the member state changes, or 
new member ID or
+ * epoch are received.
+ */
+public interface MemberStateListener {
+
+    /**
+     * Called when the member transitions to a new state.
+     *
+     * @param state New state.
+     */
+    void onStateChange(MemberState state);
+
+    /**
+     * Called when the member receives a new member ID.
+     *
+     * @param memberId New member ID.
+     * @param epoch    Latest member epoch received.
+     */
+    void onMemberIdUpdated(String memberId, int epoch);
+
+    /**
+     * Called when a member receives a new member epoch.
+     *
+     * @param epoch    New member epoch.
+     * @param memberId Current member ID.
+     */
+    void onMemberEpochUpdated(int epoch, String memberId);

Review Comment:
   I'm wondering why this interface can't be a single `onUpdate(MemberState 
state)` and leave it up to the callbacks to determine what's changed? 🤔



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -467,66 +594,153 @@ private void handleFatalError(final Errors error) {
                     future.completeExceptionally(new CommitFailedException());
                     break;
                 case UNKNOWN_MEMBER_ID:
-                    log.info("OffsetCommit failed due to unknown member id 
memberId {}: {}", null, error.message());
-                    future.completeExceptionally(error.exception());
+                    log.info("OffsetCommit failed due to unknown member id: 
{}", error.message());
+                    handleUnknownMemberIdError(this);
+                    break;
+                case STALE_MEMBER_EPOCH:
+                    log.info("OffsetCommit failed due to stale member epoch: 
{}", error.message());
+                    handleStaleMemberEpochError(this);
                     break;
                 default:
-                    future.completeExceptionally(new 
KafkaException("Unexpected error in commit: " + error.message()));
+                    future.completeExceptionally(new 
KafkaException("Unexpected error in commit:" +
+                            " " + error.message()));
                     break;
             }
         }
+
+        @Override
+        void abortRetry(String cause) {
+            future.completeExceptionally(new KafkaException("Offset commit 
waiting for new member" +
+                    " ID or epoch cannot be retried. " + cause));
+        }
+
+        /**
+         * Reset timers and add request to the list of pending requests, to 
make sure it is sent
+         * out on the next poll iteration, without applying any backoff.
+         */
+        @Override
+        public void retryOnMemberIdOrEpochUpdate(Optional<String> memberId,
+                                                 Optional<Integer> 
memberEpoch) {
+            this.memberId = memberId;
+            this.memberEpoch = memberEpoch;
+            reset();
+            pendingRequests.addOffsetCommitRequest(this);
+        }
+
+        @Override
+        public String requestName() {
+            return ApiKeys.OFFSET_COMMIT.name();
+        }
+    }
+
+    /**
+     * Represents a request that can be retried or aborted, based on member ID 
and epoch
+     * information.
+     */
+    abstract static class RetriableRequestState extends RequestState {
+
+        /**
+         * Member ID to be included in the request if present.
+         */
+        Optional<String> memberId;
+
+        /**
+         * Member epoch to be included in the request if present.
+         */
+        Optional<Integer> memberEpoch;
+
+        RetriableRequestState(LogContext logContext, String owner, long 
retryBackoffMs, long retryBackoffMaxMs) {
+            super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+            this.memberId = Optional.empty();
+            this.memberEpoch = Optional.empty();
+        }
+
+        // Visible for testing
+        RetriableRequestState(LogContext logContext, String owner, long 
retryBackoffMs, int retryBackoffExpBase,
+                              long retryBackoffMaxMs, double jitter) {
+            super(logContext, owner, retryBackoffMs, retryBackoffExpBase, 
retryBackoffMaxMs, jitter);
+            this.memberId = Optional.empty();
+            this.memberEpoch = Optional.empty();
+        }
+
+        abstract void abortRetry(String cause);
+
+        abstract void retryOnMemberIdOrEpochUpdate(Optional<String> memberId, 
Optional<Integer> memberEpoch);
+
+        abstract String requestName();

Review Comment:
   It appears as though the request "name" is just the string-ified version of 
the `ApiKeys` value. Any reason to obscure it as a generic string?
   
   ```suggestion
           abstract ApiKeys requestedApi();
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * Listener to register for getting notified when the member state changes, or 
new member ID or
+ * epoch are received.
+ */
+public interface MemberStateListener {
+
+    /**
+     * Called when the member transitions to a new state.
+     *
+     * @param state New state.
+     */
+    void onStateChange(MemberState state);
+
+    /**
+     * Called when the member receives a new member ID.
+     *
+     * @param memberId New member ID.
+     * @param epoch    Latest member epoch received.
+     */
+    void onMemberIdUpdated(String memberId, int epoch);
+
+    /**
+     * Called when a member receives a new member epoch.
+     *
+     * @param epoch    New member epoch.
+     * @param memberId Current member ID.
+     */
+    void onMemberEpochUpdated(int epoch, String memberId);

Review Comment:
   I'm wondering why this interface can't be a single `onUpdate(MemberState 
state)` and leave it up to the callbacks to determine what's changed? 🤔



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1077,4 +1131,18 @@ public void onUpdate(ClusterResource clusterResource) {
             reconcile();
         }
     }
+
+    /**
+     * Register a new listener that will be invoked whenever the member state 
changes, or a new
+     * member ID or epoch is received.
+     *
+     * @param listener Listener to invoke.
+     */
+    @Override
+    public void registerStateListener(MemberStateListener listener) {
+        if (listener == null) {
+            throw new IllegalArgumentException("State updates listener cannot 
be null");
+        }
+        this.stateUpdatesListeners.add(listener);
+    }

Review Comment:
   Do we need the ability to _remove_ a `MemberStateListener` from the set of 
listeners?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -161,16 +172,16 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
         List<NetworkClientDelegate.UnsentRequest> requests = 
pendingRequests.drain(currentTimeMs);
         // min of the remainingBackoffMs of all the request that are still 
backing off
         final long timeUntilNextPoll = Math.min(
-            findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
-            findMinTime(unsentOffsetFetchRequests(), currentTimeMs));
+                findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
+                findMinTime(unsentOffsetFetchRequests(), currentTimeMs));
         return new NetworkClientDelegate.PollResult(timeUntilNextPoll, 
requests);
     }
 
     private static long findMinTime(final Collection<? extends RequestState> 
requests, final long currentTimeMs) {
         return requests.stream()
-            .mapToLong(request -> request.remainingBackoffMs(currentTimeMs))
-            .min()
-            .orElse(Long.MAX_VALUE);
+                .mapToLong(request -> 
request.remainingBackoffMs(currentTimeMs))
+                .min()
+                .orElse(Long.MAX_VALUE);

Review Comment:
   Nit: we can leave the indentation as is.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -364,13 +489,19 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
                 requestTopicDataMap.put(topicPartition.topic(), topic);
             }
 
-            OffsetCommitRequest.Builder builder = new 
OffsetCommitRequest.Builder(
-                new OffsetCommitRequestData()
+            OffsetCommitRequestData data = new OffsetCommitRequestData()
                     .setGroupId(this.groupId)
-                    .setGenerationIdOrMemberEpoch(generation.generationId)
-                    .setMemberId(generation.memberId)
-                    .setGroupInstanceId(groupInstanceId)
-                    .setTopics(new ArrayList<>(requestTopicDataMap.values())));
+                    .setGroupInstanceId(groupInstanceId.orElse(null))

Review Comment:
   Any reason group instance ID doesn't also use the approach `if 
(groupInstanceId.isPresent()) {` like the other `Optional` values?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -119,29 +126,33 @@ public CommitRequestManager(
         final ConsumerConfig config,
         final CoordinatorRequestManager coordinatorRequestManager,
         final BackgroundEventHandler backgroundEventHandler,
-        final GroupState groupState,
+        final String groupId,
+        final Optional<String> groupInstanceId,
         final long retryBackoffMs,
         final long retryBackoffMaxMs,
-        final double jitter) {
+        final OptionalDouble jitter) {
         Objects.requireNonNull(coordinatorRequestManager, "Coordinator is 
needed upon committing offsets");
         this.logContext = logContext;
         this.log = logContext.logger(getClass());
         this.pendingRequests = new PendingRequests();
         if (config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
             final long autoCommitInterval =
-                
Integer.toUnsignedLong(config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+                    
Integer.toUnsignedLong(config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));

Review Comment:
   Nit: we can leave the indentation as is so that it corresponds more closely 
with the existing 'four spaces per tab' style.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * Listener to register for getting notified when the member state changes, or 
new member ID or
+ * epoch are received.

Review Comment:
   Perhaps we could add clarification for these two questions:
   
   1. Should the listener "cache" the `MemberState` for other uses?
   2. Is this usable by code that is owned by the application thread?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -467,66 +594,153 @@ private void handleFatalError(final Errors error) {
                     future.completeExceptionally(new CommitFailedException());
                     break;
                 case UNKNOWN_MEMBER_ID:
-                    log.info("OffsetCommit failed due to unknown member id 
memberId {}: {}", null, error.message());
-                    future.completeExceptionally(error.exception());
+                    log.info("OffsetCommit failed due to unknown member id: 
{}", error.message());
+                    handleUnknownMemberIdError(this);
+                    break;
+                case STALE_MEMBER_EPOCH:
+                    log.info("OffsetCommit failed due to stale member epoch: 
{}", error.message());
+                    handleStaleMemberEpochError(this);
                     break;
                 default:
-                    future.completeExceptionally(new 
KafkaException("Unexpected error in commit: " + error.message()));
+                    future.completeExceptionally(new 
KafkaException("Unexpected error in commit:" +
+                            " " + error.message()));

Review Comment:
   IMO I would just leave it on one line 🤷‍♂️ 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -467,66 +594,153 @@ private void handleFatalError(final Errors error) {
                     future.completeExceptionally(new CommitFailedException());
                     break;
                 case UNKNOWN_MEMBER_ID:
-                    log.info("OffsetCommit failed due to unknown member id 
memberId {}: {}", null, error.message());
-                    future.completeExceptionally(error.exception());
+                    log.info("OffsetCommit failed due to unknown member id: 
{}", error.message());
+                    handleUnknownMemberIdError(this);
+                    break;
+                case STALE_MEMBER_EPOCH:
+                    log.info("OffsetCommit failed due to stale member epoch: 
{}", error.message());
+                    handleStaleMemberEpochError(this);
                     break;
                 default:
-                    future.completeExceptionally(new 
KafkaException("Unexpected error in commit: " + error.message()));
+                    future.completeExceptionally(new 
KafkaException("Unexpected error in commit:" +
+                            " " + error.message()));
                     break;
             }
         }
+
+        @Override
+        void abortRetry(String cause) {
+            future.completeExceptionally(new KafkaException("Offset commit 
waiting for new member" +
+                    " ID or epoch cannot be retried. " + cause));
+        }

Review Comment:
   It seems like the value of `cause` is always `"The member is in an 
unrecoverable state."`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -467,66 +594,153 @@ private void handleFatalError(final Errors error) {
                     future.completeExceptionally(new CommitFailedException());
                     break;
                 case UNKNOWN_MEMBER_ID:
-                    log.info("OffsetCommit failed due to unknown member id 
memberId {}: {}", null, error.message());
-                    future.completeExceptionally(error.exception());
+                    log.info("OffsetCommit failed due to unknown member id: 
{}", error.message());
+                    handleUnknownMemberIdError(this);
+                    break;
+                case STALE_MEMBER_EPOCH:
+                    log.info("OffsetCommit failed due to stale member epoch: 
{}", error.message());
+                    handleStaleMemberEpochError(this);
                     break;
                 default:
-                    future.completeExceptionally(new 
KafkaException("Unexpected error in commit: " + error.message()));
+                    future.completeExceptionally(new 
KafkaException("Unexpected error in commit:" +
+                            " " + error.message()));
                     break;
             }
         }
+
+        @Override
+        void abortRetry(String cause) {
+            future.completeExceptionally(new KafkaException("Offset commit 
waiting for new member" +
+                    " ID or epoch cannot be retried. " + cause));
+        }
+
+        /**
+         * Reset timers and add request to the list of pending requests, to 
make sure it is sent
+         * out on the next poll iteration, without applying any backoff.
+         */
+        @Override
+        public void retryOnMemberIdOrEpochUpdate(Optional<String> memberId,
+                                                 Optional<Integer> 
memberEpoch) {
+            this.memberId = memberId;
+            this.memberEpoch = memberEpoch;
+            reset();
+            pendingRequests.addOffsetCommitRequest(this);
+        }
+
+        @Override
+        public String requestName() {
+            return ApiKeys.OFFSET_COMMIT.name();
+        }
+    }
+
+    /**
+     * Represents a request that can be retried or aborted, based on member ID 
and epoch
+     * information.
+     */
+    abstract static class RetriableRequestState extends RequestState {
+
+        /**
+         * Member ID to be included in the request if present.
+         */
+        Optional<String> memberId;
+
+        /**
+         * Member epoch to be included in the request if present.
+         */
+        Optional<Integer> memberEpoch;
+
+        RetriableRequestState(LogContext logContext, String owner, long 
retryBackoffMs, long retryBackoffMaxMs) {
+            super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+            this.memberId = Optional.empty();
+            this.memberEpoch = Optional.empty();
+        }
+
+        // Visible for testing
+        RetriableRequestState(LogContext logContext, String owner, long 
retryBackoffMs, int retryBackoffExpBase,
+                              long retryBackoffMaxMs, double jitter) {
+            super(logContext, owner, retryBackoffMs, retryBackoffExpBase, 
retryBackoffMaxMs, jitter);
+            this.memberId = Optional.empty();
+            this.memberEpoch = Optional.empty();
+        }
+
+        abstract void abortRetry(String cause);
+
+        abstract void retryOnMemberIdOrEpochUpdate(Optional<String> memberId, 
Optional<Integer> memberEpoch);
+
+        abstract String requestName();
     }
 
-    class OffsetFetchRequestState extends RequestState {
+    class OffsetFetchRequestState extends RetriableRequestState {
+
+        /**
+         * Partitions to get committed offsets for.
+         */
         public final Set<TopicPartition> requestedPartitions;
-        public final GroupState.Generation requestedGeneration;
+
         private final CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> future;
+
         public OffsetFetchRequestState(final Set<TopicPartition> partitions,
-                                       final GroupState.Generation generation,
                                        final long retryBackoffMs,
                                        final long retryBackoffMaxMs) {
             super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs, retryBackoffMaxMs);
             this.requestedPartitions = partitions;
-            this.requestedGeneration = generation;
             this.future = new CompletableFuture<>();
         }
 
         public OffsetFetchRequestState(final Set<TopicPartition> partitions,
-                                       final GroupState.Generation generation,
                                        final long retryBackoffMs,
                                        final long retryBackoffMaxMs,
                                        final double jitter) {
             super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs, 2, retryBackoffMaxMs, jitter);
             this.requestedPartitions = partitions;
-            this.requestedGeneration = generation;
             this.future = new CompletableFuture<>();
         }
 
         public boolean sameRequest(final OffsetFetchRequestState request) {
-            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+            return requestedPartitions.equals(request.requestedPartitions);
         }
 
         public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
-            OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
-                    groupState.groupId,
-                    true,
-                    new ArrayList<>(this.requestedPartitions),
-                    throwOnFetchStableOffsetUnsupported);
+            OffsetFetchRequest.Builder builder;
+            if (memberId.isPresent() && memberEpoch.isPresent()) {
+                builder = new OffsetFetchRequest.Builder(
+                        groupId,
+                        memberId.get(),
+                        memberEpoch.get(),
+                        true,
+                        new ArrayList<>(this.requestedPartitions),
+                        throwOnFetchStableOffsetUnsupported);
+            } else {
+                // Building request without passing member ID/epoch to leave 
the logic to choose
+                // default values when not present on the request builder.
+                builder = new OffsetFetchRequest.Builder(
+                        groupId,
+                        true,
+                        new ArrayList<>(this.requestedPartitions),
+                        throwOnFetchStableOffsetUnsupported);
+            }
             return new NetworkClientDelegate.UnsentRequest(builder, 
coordinatorRequestManager.coordinator())
-                .whenComplete((r, t) -> onResponse(r.receivedTimeMs(), 
(OffsetFetchResponse) r.responseBody()));
+                    .whenComplete((r, t) -> onResponse(r.receivedTimeMs(), 
(OffsetFetchResponse) r.responseBody()));
         }
 
+        /**
+         * Handle request responses, including successful and failed.
+         */
         public void onResponse(
-            final long currentTimeMs,
-            final OffsetFetchResponse response) {
-            Errors responseError = 
response.groupLevelError(groupState.groupId);
+                final long currentTimeMs,
+                final OffsetFetchResponse response) {

Review Comment:
   Nit: indentation.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -619,11 +838,36 @@ private CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> chainFuture(fi
         public String toString() {
             return "OffsetFetchRequestState{" +
                     "requestedPartitions=" + requestedPartitions +
-                    ", requestedGeneration=" + requestedGeneration +
+                    ", memberId=" + memberId.orElse("undefined") +
+                    ", memberEpoch=" + (memberEpoch.isPresent() ? 
memberEpoch.get() : "undefined") +

Review Comment:
   What about this:
   
   ```suggestion
                       ", memberEpoch=" + memberEpoch.orElse("undefined") +
   ```
   
   Actually, it's similar to what is done above that line.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -661,17 +905,15 @@ OffsetCommitRequestState createOffsetCommitRequest(final 
Map<TopicPartition, Off
             return jitter.isPresent() ?
                     new OffsetCommitRequestState(
                         offsets,
-                        groupState.groupId,
-                        groupState.groupInstanceId.orElse(null),
-                        groupState.generation,
+                            groupId,
+                            groupInstanceId,
                         retryBackoffMs,
                         retryBackoffMaxMs,
                         jitter.getAsDouble()) :
                     new OffsetCommitRequestState(
                         offsets,
-                        groupState.groupId,
-                        groupState.groupInstanceId.orElse(null),
-                        groupState.generation,
+                            groupId,
+                            groupInstanceId,

Review Comment:
   Nit: indentation.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -661,17 +905,15 @@ OffsetCommitRequestState createOffsetCommitRequest(final 
Map<TopicPartition, Off
             return jitter.isPresent() ?
                     new OffsetCommitRequestState(
                         offsets,
-                        groupState.groupId,
-                        groupState.groupInstanceId.orElse(null),
-                        groupState.generation,
+                            groupId,
+                            groupInstanceId,

Review Comment:
   Nit: indentation.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -467,66 +594,153 @@ private void handleFatalError(final Errors error) {
                     future.completeExceptionally(new CommitFailedException());
                     break;
                 case UNKNOWN_MEMBER_ID:
-                    log.info("OffsetCommit failed due to unknown member id 
memberId {}: {}", null, error.message());
-                    future.completeExceptionally(error.exception());
+                    log.info("OffsetCommit failed due to unknown member id: 
{}", error.message());
+                    handleUnknownMemberIdError(this);
+                    break;
+                case STALE_MEMBER_EPOCH:
+                    log.info("OffsetCommit failed due to stale member epoch: 
{}", error.message());
+                    handleStaleMemberEpochError(this);
                     break;
                 default:
-                    future.completeExceptionally(new 
KafkaException("Unexpected error in commit: " + error.message()));
+                    future.completeExceptionally(new 
KafkaException("Unexpected error in commit:" +
+                            " " + error.message()));

Review Comment:
   IMO I would just leave it on one line 🤷‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to