lianetm commented on code in PR #16963:
URL: https://github.com/apache/kafka/pull/16963#discussion_r1727765532


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -467,12 +441,12 @@ static class HeartbeatRequestState extends RequestState {
         private long heartbeatIntervalMs;
 
         public HeartbeatRequestState(

Review Comment:
   could this be just protected now?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java:
##########
@@ -16,123 +16,72 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
-import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
 import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 
-import org.slf4j.Logger;
-
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.TreeSet;
 
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
 
 /**
  * <p>Manages the request creation and response handling for the heartbeat of 
a share group. The module creates a
- * {@link ShareGroupHeartbeatRequest} using the state stored in the {@link 
ShareMembershipManager} and enqueues it to
- * the network queue to be sent out. Once the response is received, the module 
will update the state in the
- * {@link ShareMembershipManager} and handle any errors.</p>
+ * {@link ShareGroupHeartbeatRequest} using the state stored in the {@link 
ShareMembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, it updates 
the state in the
+ * {@link ShareMembershipManager} and handles any errors.</p>
  *
  * <p>The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
- * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean 
the member is either in a stable
+ * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}, which 
means the member is either in a stable
  * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.</p>
  *
- * <p>If the member got kick out of a group, it will give up the current 
assignment and join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will attempt to join again 
with a zero epoch.</p>
  *
- * <p>If the member does not have groupId configured or encounters fatal 
exceptions, a heartbeat will not be sent.</p>
+ * <p>If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.</p>
  *
- * <p>If the coordinator is not found, we will skip sending the heartbeat and 
try to find a coordinator first.</p>
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.</p>
  *
- * <p>If the heartbeat failed due to retriable errors, such as, 
TimeoutException, the subsequent attempt will be
- * backoff exponentially.</p>
+ * <p>If the heartbeat failed due to retriable errors, such as 
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.</p>
  *
  * <p>When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
  * that a heartbeat will be sent in the next event loop.</p>
  *
- * <p>See {@link HeartbeatRequestState} for more details.</p>
+ * <p>See {@link AbstractHeartbeatRequestManager.HeartbeatRequestState} for 
more details.</p>
  */
-public class ShareHeartbeatRequestManager implements RequestManager {
-
-    private final Logger logger;
-
-    /**
-     * Time that the group coordinator will wait on member to revoke its 
partitions. This is provided by the group
-     * coordinator in the heartbeat
-     */
-    private final int maxPollIntervalMs;
-
-    /**
-     * CoordinatorRequestManager manages the connection to the group 
coordinator
-     */
-    private final CoordinatorRequestManager coordinatorRequestManager;
+public class ShareHeartbeatRequestManager extends 
AbstractHeartbeatRequestManager<ShareGroupHeartbeatResponse, 
ShareGroupHeartbeatResponseData> {
 
     /**
-     * HeartbeatRequestState manages heartbeat request timing and retries
+     * Membership manager for consumer groups
      */
-    private final HeartbeatRequestState heartbeatRequestState;
+    private final ShareMembershipManager membershipManager;
 
     /*
      * HeartbeatState manages building the heartbeat requests correctly
      */
     private final HeartbeatState heartbeatState;
 
-    /**
-     * ShareMembershipManager manages member's essential attributes like epoch 
and id, and its rebalance state
-     */
-    private final ShareMembershipManager shareMembershipManager;
-
-    /**
-     * ErrorEventHandler allows the background thread to propagate errors back 
to the user
-     */
-    private final BackgroundEventHandler backgroundEventHandler;
-
-    /**
-     * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
-     * sending heartbeat until the next poll.
-     */
-    private final Timer pollTimer;
-
-    /**
-     * Holding the heartbeat sensor to measure heartbeat timing and response 
latency
-     */
-    private final HeartbeatMetricsManager metricsManager;
-
     public ShareHeartbeatRequestManager(
             final LogContext logContext,
             final Time time,
             final ConsumerConfig config,
             final CoordinatorRequestManager coordinatorRequestManager,
             final SubscriptionState subscriptions,
-            final ShareMembershipManager shareMembershipManager,
+            final ShareMembershipManager membershipManager,
             final BackgroundEventHandler backgroundEventHandler,
             final Metrics metrics) {
-        this.coordinatorRequestManager = coordinatorRequestManager;
-        this.logger = logContext.logger(getClass());
-        this.shareMembershipManager = shareMembershipManager;
-        this.backgroundEventHandler = backgroundEventHandler;
-        this.maxPollIntervalMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
-        long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
-        long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
-        this.heartbeatState = new HeartbeatState(subscriptions, 
shareMembershipManager);
-        this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
-                retryBackoffMaxMs, maxPollIntervalMs);
-        this.pollTimer = time.timer(maxPollIntervalMs);
-        this.metricsManager = new HeartbeatMetricsManager(metrics, 
CONSUMER_SHARE_METRIC_GROUP_PREFIX);
+        super(logContext, time, config, coordinatorRequestManager, 
backgroundEventHandler,
+                new HeartbeatMetricsManager(metrics, 
CONSUMER_SHARE_METRIC_GROUP_PREFIX));

Review Comment:
   indentation off?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -375,81 +342,88 @@ private void onErrorResponse(final 
ConsumerGroupHeartbeatResponse response,
 
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // the manager will backoff and retry
-                message = String.format("GroupHeartbeatRequest failed because 
the group coordinator %s is still loading." +
-                                "Will retry",
-                        coordinatorRequestManager.coordinator());
+                message = String.format("%s failed because the group 
coordinator %s is still loading. Will retry",
+                        heartbeatRequestName(), 
coordinatorRequestManager.coordinator());
                 logInfo(message, response, currentTimeMs);
                 break;
 
             case GROUP_AUTHORIZATION_FAILED:
                 GroupAuthorizationException exception =
-                        
GroupAuthorizationException.forGroupId(membershipManager.groupId());
-                logger.error("GroupHeartbeatRequest failed due to group 
authorization failure: {}", exception.getMessage());
+                        
GroupAuthorizationException.forGroupId(membershipManager().groupId());
+                logger.error("{} failed due to group authorization failure: 
{}",
+                        heartbeatRequestName(), exception.getMessage());
                 handleFatalFailure(error.exception(exception.getMessage()));
                 break;
 
-            case UNRELEASED_INSTANCE_ID:
-                logger.error("GroupHeartbeatRequest failed due to unreleased 
instance id {}: {}",
-                        membershipManager.groupInstanceId().orElse("null"), 
errorMessage);
-                handleFatalFailure(error.exception(errorMessage));
-                break;
-
-            case FENCED_INSTANCE_ID:
-                logger.error("GroupHeartbeatRequest failed due to fenced 
instance id {}: {}. " +
-                        "This is expected in the case that the member was 
removed from the group " +
-                        "by an admin client, and another member joined using 
the same group instance id.",
-                    membershipManager.groupInstanceId().orElse("null"), 
errorMessage);
-                handleFatalFailure(error.exception(errorMessage));
-                break;
-
             case INVALID_REQUEST:
             case GROUP_MAX_SIZE_REACHED:
             case UNSUPPORTED_ASSIGNOR:
             case UNSUPPORTED_VERSION:
-                logger.error("GroupHeartbeatRequest failed due to {}: {}", 
error, errorMessage);
+                logger.error("{} failed due to {}: {}", 
heartbeatRequestName(), error, errorMessage);
                 handleFatalFailure(error.exception(errorMessage));
                 break;
 
             case FENCED_MEMBER_EPOCH:
-                message = String.format("GroupHeartbeatRequest failed for 
member %s because epoch %s is fenced.",
-                        membershipManager.memberId(), 
membershipManager.memberEpoch());
+                message = String.format("%s failed for member %s because epoch 
%s is fenced.",
+                        heartbeatRequestName(), 
membershipManager().memberId(), membershipManager().memberEpoch());
                 logInfo(message, response, currentTimeMs);
-                membershipManager.transitionToFenced();
+                membershipManager().transitionToFenced();
                 // Skip backoff so that a next HB to rejoin is sent as soon as 
the fenced member releases its assignment
                 heartbeatRequestState.reset();
                 break;
 
             case UNKNOWN_MEMBER_ID:
-                message = String.format("GroupHeartbeatRequest failed because 
member %s is unknown.",
-                        membershipManager.memberId());
+                message = String.format("%s failed because member %s is 
unknown.",
+                        heartbeatRequestName(), 
membershipManager().memberId());
                 logInfo(message, response, currentTimeMs);
-                membershipManager.transitionToFenced();
+                membershipManager().transitionToFenced();
                 // Skip backoff so that a next HB to rejoin is sent as soon as 
the fenced member releases its assignment
                 heartbeatRequestState.reset();
                 break;
 
             default:
                 // If the manager receives an unknown error - there could be a 
bug in the code or a new error code
-                logger.error("GroupHeartbeatRequest failed due to unexpected 
error {}: {}", error, errorMessage);
-                handleFatalFailure(error.exception(errorMessage));
+                if (!handleSpecificError(response, currentTimeMs)) {
+                    logger.error("{} failed due to unexpected error {}: {}", 
heartbeatRequestName(), error, errorMessage);
+                    handleFatalFailure(error.exception(errorMessage));
+                }
                 break;
         }
     }
 
-    private void logInfo(final String message,
-                         final ConsumerGroupHeartbeatResponse response,
+    protected void logInfo(final String message,
+                         final HBR response,
                          final long currentTimeMs) {
         logger.info("{} in {}ms: {}",
-            message,
-            heartbeatRequestState.remainingBackoffMs(currentTimeMs),
-            response.data().errorMessage());
+                message,
+                heartbeatRequestState.remainingBackoffMs(currentTimeMs),
+                errorMessageForResponse(response));

Review Comment:
   indentation with 8 instead of 4 continuation that we had unintentionally in 
the previous PR, still unintentional here? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -18,90 +18,70 @@
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import 
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
 
 /**
  * <p>Manages the request creation and response handling for the heartbeat. 
The module creates a
- * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
ConsumerMembershipManager} and enqueue it to
- * the network queue to be sent out. Once the response is received, the module 
will update the state in the
- * {@link ConsumerMembershipManager} and handle any errors.</p>
+ * heartbeat request using the state stored in the membership manager and 
enqueues it to
+ * the network queue to be sent out. Once the response is received, it updates 
the state in the
+ * membership manager and handles any errors.
  *
  * <p>The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
  * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean 
the member is either in a stable
- * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.</p>
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
  *
- * <p>If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
- * OnPartitionsLost} because reattempting to join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} before attempting to join again with a zero epoch.
  *
- * <p>If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.</p>
+ * <p>If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.
  *
- * <p>If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.</p>
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
  *
- * <p>If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
- * backoff exponentially.</p>
+ * <p>If the heartbeat failed due to retriable errors, such as 
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.
  *
  * <p>When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so

Review Comment:
   uhm this is not accurate, when reconciliation completes there is no reset 
(that would lead to a full HB). It's just a next HB including the reconciled 
partitions (I would just remove it)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -18,90 +18,70 @@
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import 
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
 
 /**
  * <p>Manages the request creation and response handling for the heartbeat. 
The module creates a
- * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
ConsumerMembershipManager} and enqueue it to
- * the network queue to be sent out. Once the response is received, the module 
will update the state in the
- * {@link ConsumerMembershipManager} and handle any errors.</p>
+ * heartbeat request using the state stored in the membership manager and 
enqueues it to
+ * the network queue to be sent out. Once the response is received, it updates 
the state in the
+ * membership manager and handles any errors.
  *
  * <p>The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
  * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean 
the member is either in a stable
- * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.</p>
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
  *
- * <p>If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
- * OnPartitionsLost} because reattempting to join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} before attempting to join again with a zero epoch.
  *
- * <p>If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.</p>
+ * <p>If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.
  *
- * <p>If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.</p>
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
  *
- * <p>If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
- * backoff exponentially.</p>
+ * <p>If the heartbeat failed due to retriable errors, such as 
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.
  *
  * <p>When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
- * that a heartbeat will be sent in the next event loop.</p>
+ * that a heartbeat will be sent in the next event loop.
  *
- * <p>See {@link HeartbeatRequestState} for more details.</p>
+ * <p>The class variable HBR is the response for the specific group's 
heartbeat RPC.
+ * <p>The class variable HRBD is the response data for the specific group's 
heartbeat RPC.
  */
-public class HeartbeatRequestManager implements RequestManager {
+public abstract class AbstractHeartbeatRequestManager<HBR extends 
AbstractResponse, HBRD extends ApiMessage> implements RequestManager {

Review Comment:
   nit: What about simplifying  the generic names to R (response) and 
D(response data)? We already have the HB flavour at the class name level



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java:
##########
@@ -16,123 +16,72 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
-import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
 import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 
-import org.slf4j.Logger;
-
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.TreeSet;
 
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
 
 /**
  * <p>Manages the request creation and response handling for the heartbeat of 
a share group. The module creates a
- * {@link ShareGroupHeartbeatRequest} using the state stored in the {@link 
ShareMembershipManager} and enqueues it to
- * the network queue to be sent out. Once the response is received, the module 
will update the state in the
- * {@link ShareMembershipManager} and handle any errors.</p>
+ * {@link ShareGroupHeartbeatRequest} using the state stored in the {@link 
ShareMembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, it updates 
the state in the
+ * {@link ShareMembershipManager} and handles any errors.</p>
  *
  * <p>The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
- * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean 
the member is either in a stable
+ * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}, which 
means the member is either in a stable
  * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.</p>
  *
- * <p>If the member got kick out of a group, it will give up the current 
assignment and join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will attempt to join again 
with a zero epoch.</p>
  *
- * <p>If the member does not have groupId configured or encounters fatal 
exceptions, a heartbeat will not be sent.</p>
+ * <p>If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.</p>
  *
- * <p>If the coordinator is not found, we will skip sending the heartbeat and 
try to find a coordinator first.</p>
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.</p>
  *
- * <p>If the heartbeat failed due to retriable errors, such as, 
TimeoutException, the subsequent attempt will be
- * backoff exponentially.</p>
+ * <p>If the heartbeat failed due to retriable errors, such as 
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.</p>
  *
  * <p>When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
  * that a heartbeat will be sent in the next event loop.</p>
  *
- * <p>See {@link HeartbeatRequestState} for more details.</p>
+ * <p>See {@link AbstractHeartbeatRequestManager.HeartbeatRequestState} for 
more details.</p>

Review Comment:
   similar to my comment on the ConsumerHeartbeatRequestManager java doc, I 
would suggest we keep at this level only the info that is specific to this 
manager (basically that it's based on ShareGroupHeartbeatRequest and 
ShareGroupHeartbeatResponse), and then refer to the base class java doc for the 
common info that applies to all. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java:
##########
@@ -194,6 +194,7 @@ public ConsumerMembershipManager(String groupId,
      * @return Instance ID used by the member when joining the group. If 
non-empty, it will indicate that
      * this is a static member.
      */

Review Comment:
   {@inheritDoc} better to avoid maintaining both?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * <p>Manages the request creation and response handling for the heartbeat of 
a consumer group. The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
ConsumerMembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, it updates 
the state in the
+ * {@link ConsumerMembershipManager} and handles any errors.</p>
+ *
+ * <p>The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
+ * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}, which 
means the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.</p>
+ *
+ * <p>If the member got kicked out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} before attempting to join again with a zero epoch.</p>
+ *
+ * <p>If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.</p>
+ *
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.</p>
+ *
+ * <p>If the heartbeat failed due to retriable errors, such as 
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.</p>
+ *
+ * <p>When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.</p>
+ *
+ * <p>See {@link AbstractHeartbeatRequestManager.HeartbeatRequestState} for 
more details.</p>

Review Comment:
   this is kind of duplicating the java doc on the base abstract manager and 
will be a nightmare to maintain (I left comments on the base class already). 
   
   What about we just leave here what's specific to this manager: "Manages the 
request creation and response handling for the heartbeat of a consumer group, 
generating ConsumerGroupHeartbeatRequest and processing 
ConsumerGroupHeartbeatResponse" (along those lines). And then maybe just {@see 
AbstractHeartbeatRequestManager}. Better?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -18,90 +18,70 @@
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import 
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
 
 /**
  * <p>Manages the request creation and response handling for the heartbeat. 
The module creates a
- * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
ConsumerMembershipManager} and enqueue it to
- * the network queue to be sent out. Once the response is received, the module 
will update the state in the
- * {@link ConsumerMembershipManager} and handle any errors.</p>
+ * heartbeat request using the state stored in the membership manager and 
enqueues it to
+ * the network queue to be sent out. Once the response is received, it updates 
the state in the
+ * membership manager and handles any errors.
  *
  * <p>The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
  * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean 
the member is either in a stable
- * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.</p>
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
  *
- * <p>If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
- * OnPartitionsLost} because reattempting to join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} before attempting to join again with a zero epoch.

Review Comment:
   this is really not the responsibility of the HB manager (it's all in the 
membership mgr). I would say we just remove these 2 lines? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -18,90 +18,70 @@
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import 
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
 
 /**
  * <p>Manages the request creation and response handling for the heartbeat. 
The module creates a
- * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
ConsumerMembershipManager} and enqueue it to
- * the network queue to be sent out. Once the response is received, the module 
will update the state in the
- * {@link ConsumerMembershipManager} and handle any errors.</p>
+ * heartbeat request using the state stored in the membership manager and 
enqueues it to
+ * the network queue to be sent out. Once the response is received, it updates 
the state in the
+ * membership manager and handles any errors.
  *
  * <p>The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
  * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean 
the member is either in a stable
- * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.</p>
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.

Review Comment:
   this is not really accurate, it's just probably outdated (other states that 
send HB were probably added after). Instead of referring to the specific 
states, what about we just describe the basics, something like: the HB manager 
generates HB requests based on the member state, that indicates if HB should be 
sent. It's also responsible for timing the requests, to ensure they are sent on 
the interval (ex. while member is stable in a group), or on demand (ex. to 
acknowledge a reconciled assignment or leave the group) 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -275,98 +242,98 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest(final long curr
                                                                      final 
boolean ignoreResponse) {
         NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
         heartbeatRequestState.onSendAttempt(currentTimeMs);
-        membershipManager.onHeartbeatRequestGenerated();
+        membershipManager().onHeartbeatRequestGenerated();
         metricsManager.recordHeartbeatSentMs(currentTimeMs);
         heartbeatRequestState.resetTimer();
         return request;
     }
 
+    @SuppressWarnings("unchecked")
     private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
boolean ignoreResponse) {
-        NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-            new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
-            coordinatorRequestManager.coordinator());
+        NetworkClientDelegate.UnsentRequest request = buildHeartbeatRequest();
         if (ignoreResponse)
             return logResponse(request);
         else
             return request.whenComplete((response, exception) -> {
                 long completionTimeMs = request.handler().completionTimeMs();
                 if (response != null) {
                     
metricsManager.recordRequestLatency(response.requestLatencyMs());
-                    onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), completionTimeMs);
+                    onResponse((HBR) response.responseBody(), 
completionTimeMs);
                 } else {
                     onFailure(exception, completionTimeMs);
                 }
             });
     }
 
+    @SuppressWarnings("unchecked")
     private NetworkClientDelegate.UnsentRequest logResponse(final 
NetworkClientDelegate.UnsentRequest request) {
         return request.whenComplete((response, exception) -> {
             if (response != null) {
                 
metricsManager.recordRequestLatency(response.requestLatencyMs());
                 Errors error =
-                    Errors.forCode(((ConsumerGroupHeartbeatResponse) 
response.responseBody()).data().errorCode());
+                    Errors.forCode(errorCodeForResponse((HBR) 
response.responseBody()));
                 if (error == Errors.NONE)
-                    logger.debug("GroupHeartbeat responded successfully: {}", 
response);
+                    logger.debug("{} responded successfully: {}", 
heartbeatRequestName(), response);
                 else
-                    logger.error("GroupHeartbeat failed because of {}: {}", 
error, response);
+                    logger.error("{} failed because of {}: {}", 
heartbeatRequestName(), error, response);
             } else {
-                logger.error("GroupHeartbeat failed because of unexpected 
exception.", exception);
+                logger.error("{} failed because of unexpected exception.", 
heartbeatRequestName(), exception);
             }
         });
     }
 
     private void onFailure(final Throwable exception, final long 
responseTimeMs) {
         this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
-        this.heartbeatState.reset();
-        membershipManager.onHeartbeatFailure(exception instanceof 
RetriableException);
+        resetHeartbeatState();
+        membershipManager().onHeartbeatFailure(exception instanceof 
RetriableException);
         if (exception instanceof RetriableException) {
-            String message = String.format("GroupHeartbeatRequest failed 
because of the retriable exception. " +
-                    "Will retry in %s ms: %s",
-                heartbeatRequestState.remainingBackoffMs(responseTimeMs),
-                exception.getMessage());
+            String message = String.format("%s failed because of the retriable 
exception. Will retry in %s ms: %s",
+                    heartbeatRequestName(),
+                    heartbeatRequestState.remainingBackoffMs(responseTimeMs),
+                    exception.getMessage());
             logger.debug(message);
         } else {
-            logger.error("GroupHeartbeatRequest failed due to fatal error: " + 
exception.getMessage());
+            logger.error("{} failed due to fatal error: {}", 
heartbeatRequestName(), exception.getMessage());
             handleFatalFailure(exception);
         }
     }
 
-    private void onResponse(final ConsumerGroupHeartbeatResponse response, 
long currentTimeMs) {
-        if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
-            
heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
+    private void onResponse(final HBR response, long currentTimeMs) {
+        if (Errors.forCode(errorCodeForResponse(response)) == Errors.NONE) {
+            
heartbeatRequestState.updateHeartbeatIntervalMs(heartbeatIntervalForResponse(response));
             heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
-            membershipManager.onHeartbeatSuccess(response.data());
+            membershipManager().onHeartbeatSuccess(responseData(response));
             return;
         }
         onErrorResponse(response, currentTimeMs);
     }
 
-    private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
+    private void onErrorResponse(final HBR response,
                                  final long currentTimeMs) {
-        Errors error = Errors.forCode(response.data().errorCode());
-        String errorMessage = response.data().errorMessage();
+        Errors error = Errors.forCode(errorCodeForResponse(response));

Review Comment:
   seems we always end up wrapping the result of `errorCodeForResponse` with 
Errors.forCode? If so we could simplify maybe by having errorCodeForResponse 
return an Error already and then just `Errors error = 
errorCodeForResponse(response);`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * <p>Manages the request creation and response handling for the heartbeat of 
a consumer group. The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
ConsumerMembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, it updates 
the state in the
+ * {@link ConsumerMembershipManager} and handles any errors.</p>
+ *
+ * <p>The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
+ * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}, which 
means the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.</p>
+ *
+ * <p>If the member got kicked out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} before attempting to join again with a zero epoch.</p>
+ *
+ * <p>If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.</p>
+ *
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.</p>
+ *
+ * <p>If the heartbeat failed due to retriable errors, such as 
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.</p>
+ *
+ * <p>When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.</p>
+ *
+ * <p>See {@link AbstractHeartbeatRequestManager.HeartbeatRequestState} for 
more details.</p>
+ */
+public class ConsumerHeartbeatRequestManager extends 
AbstractHeartbeatRequestManager<ConsumerGroupHeartbeatResponse, 
ConsumerGroupHeartbeatResponseData> {
+
+    /**
+     * Membership manager for consumer groups
+     */
+    private final ConsumerMembershipManager membershipManager;
+
+    /**
+     * HeartbeatState manages building the heartbeat requests correctly
+     */
+    private final HeartbeatState heartbeatState;
+
+    public ConsumerHeartbeatRequestManager(
+            final LogContext logContext,
+            final Time time,
+            final ConsumerConfig config,
+            final CoordinatorRequestManager coordinatorRequestManager,
+            final SubscriptionState subscriptions,
+            final ConsumerMembershipManager membershipManager,
+            final BackgroundEventHandler backgroundEventHandler,
+            final Metrics metrics) {
+        super(logContext, time, config, coordinatorRequestManager, 
backgroundEventHandler,
+                new HeartbeatMetricsManager(metrics));
+        this.membershipManager = membershipManager;
+        this.heartbeatState = new HeartbeatState(subscriptions, 
membershipManager, maxPollIntervalMs);
+    }
+
+    // Visible for testing
+    ConsumerHeartbeatRequestManager(
+            final LogContext logContext,
+            final Timer timer,
+            final ConsumerConfig config,
+            final CoordinatorRequestManager coordinatorRequestManager,
+            final ConsumerMembershipManager membershipManager,
+            final HeartbeatState heartbeatState,
+            final AbstractHeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState,
+            final BackgroundEventHandler backgroundEventHandler,
+            final Metrics metrics) {
+        super(logContext, timer, config, coordinatorRequestManager, 
heartbeatRequestState, backgroundEventHandler,
+                new HeartbeatMetricsManager(metrics));
+        this.membershipManager = membershipManager;
+        this.heartbeatState = heartbeatState;
+    }
+
+    public boolean handleSpecificError(final ConsumerGroupHeartbeatResponse 
response,
+                                       final long currentTimeMs) {
+        Errors error = Errors.forCode(errorCodeForResponse(response));
+        String errorMessage = errorMessageForResponse(response);
+        boolean errorHandled;
+
+        switch (error) {
+            case UNRELEASED_INSTANCE_ID:
+                logger.error("{} failed due to unreleased instance id {}: {}",
+                        heartbeatRequestName(), 
membershipManager.groupInstanceId().orElse("null"), errorMessage);
+                handleFatalFailure(error.exception(errorMessage));
+                errorHandled = true;
+                break;
+
+            case FENCED_INSTANCE_ID:
+                logger.error("{} failed due to fenced instance id {}: {}. " +
+                                "This is expected in the case that the member 
was removed from the group " +
+                                "by an admin client, and another member joined 
using the same group instance id.",

Review Comment:
   indentation off



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -264,9 +231,9 @@ public void resetPollTimer(final long pollMs) {
         pollTimer.update(pollMs);
         if (pollTimer.isExpired()) {
             logger.warn("Time between subsequent calls to poll() was longer 
than the configured " +
-                "max.poll.interval.ms, exceeded approximately by {} ms. Member 
{} will rejoin the group now.",
-                pollTimer.isExpiredBy(), membershipManager.memberId());
-            membershipManager.maybeRejoinStaleMember();
+                            "max.poll.interval.ms, exceeded approximately by 
{} ms. Member {} will rejoin the group now.",

Review Comment:
   indentation off?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java:
##########
@@ -16,123 +16,72 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
-import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
 import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 
-import org.slf4j.Logger;
-
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.TreeSet;
 
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
 
 /**
  * <p>Manages the request creation and response handling for the heartbeat of 
a share group. The module creates a
- * {@link ShareGroupHeartbeatRequest} using the state stored in the {@link 
ShareMembershipManager} and enqueues it to
- * the network queue to be sent out. Once the response is received, the module 
will update the state in the
- * {@link ShareMembershipManager} and handle any errors.</p>
+ * {@link ShareGroupHeartbeatRequest} using the state stored in the {@link 
ShareMembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, it updates 
the state in the
+ * {@link ShareMembershipManager} and handles any errors.</p>
  *
  * <p>The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
- * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean 
the member is either in a stable
+ * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}, which 
means the member is either in a stable
  * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.</p>
  *
- * <p>If the member got kick out of a group, it will give up the current 
assignment and join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will attempt to join again 
with a zero epoch.</p>
  *
- * <p>If the member does not have groupId configured or encounters fatal 
exceptions, a heartbeat will not be sent.</p>
+ * <p>If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.</p>
  *
- * <p>If the coordinator is not found, we will skip sending the heartbeat and 
try to find a coordinator first.</p>
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.</p>
  *
- * <p>If the heartbeat failed due to retriable errors, such as, 
TimeoutException, the subsequent attempt will be
- * backoff exponentially.</p>
+ * <p>If the heartbeat failed due to retriable errors, such as 
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.</p>
  *
  * <p>When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
  * that a heartbeat will be sent in the next event loop.</p>
  *
- * <p>See {@link HeartbeatRequestState} for more details.</p>
+ * <p>See {@link AbstractHeartbeatRequestManager.HeartbeatRequestState} for 
more details.</p>
  */
-public class ShareHeartbeatRequestManager implements RequestManager {
-
-    private final Logger logger;
-
-    /**
-     * Time that the group coordinator will wait on member to revoke its 
partitions. This is provided by the group
-     * coordinator in the heartbeat
-     */
-    private final int maxPollIntervalMs;
-
-    /**
-     * CoordinatorRequestManager manages the connection to the group 
coordinator
-     */
-    private final CoordinatorRequestManager coordinatorRequestManager;
+public class ShareHeartbeatRequestManager extends 
AbstractHeartbeatRequestManager<ShareGroupHeartbeatResponse, 
ShareGroupHeartbeatResponseData> {
 
     /**
-     * HeartbeatRequestState manages heartbeat request timing and retries
+     * Membership manager for consumer groups

Review Comment:
   for share groups 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -18,90 +18,70 @@
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import 
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
 
 /**
  * <p>Manages the request creation and response handling for the heartbeat. 
The module creates a
- * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
ConsumerMembershipManager} and enqueue it to
- * the network queue to be sent out. Once the response is received, the module 
will update the state in the
- * {@link ConsumerMembershipManager} and handle any errors.</p>
+ * heartbeat request using the state stored in the membership manager and 
enqueues it to
+ * the network queue to be sent out. Once the response is received, it updates 
the state in the
+ * membership manager and handles any errors.
  *
  * <p>The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
  * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean 
the member is either in a stable
- * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.</p>
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
  *
- * <p>If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
- * OnPartitionsLost} because reattempting to join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} before attempting to join again with a zero epoch.
  *
- * <p>If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.</p>
+ * <p>If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.
  *
- * <p>If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.</p>
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
  *
- * <p>If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
- * backoff exponentially.</p>
+ * <p>If the heartbeat failed due to retriable errors, such as 
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.
  *
  * <p>When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
- * that a heartbeat will be sent in the next event loop.</p>
+ * that a heartbeat will be sent in the next event loop.
  *
- * <p>See {@link HeartbeatRequestState} for more details.</p>
+ * <p>The class variable HBR is the response for the specific group's 
heartbeat RPC.
+ * <p>The class variable HRBD is the response data for the specific group's 
heartbeat RPC.
  */
-public class HeartbeatRequestManager implements RequestManager {
+public abstract class AbstractHeartbeatRequestManager<HBR extends 
AbstractResponse, HBRD extends ApiMessage> implements RequestManager {

Review Comment:
   It feels a bit redundant to have the response and responseData (when the 
data is already part of the response). Tracking that down, I see that it's 
needed just because `membershipMgr.onHeartbeatSuccess` requires the data. 
   
   That decision made sense at the beginning, but with this new structure I 
find that it brings more complexity than the value it has. What do you think 
about changing the `membershipMgr.onHeartbeatSuccess` to take the response as 
parameter? Then we could simplify this class to 
`AbstractHeartbeatRequestManager<R extends AbstractResponse>`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -375,81 +342,88 @@ private void onErrorResponse(final 
ConsumerGroupHeartbeatResponse response,
 
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // the manager will backoff and retry
-                message = String.format("GroupHeartbeatRequest failed because 
the group coordinator %s is still loading." +
-                                "Will retry",
-                        coordinatorRequestManager.coordinator());
+                message = String.format("%s failed because the group 
coordinator %s is still loading. Will retry",
+                        heartbeatRequestName(), 
coordinatorRequestManager.coordinator());
                 logInfo(message, response, currentTimeMs);
                 break;
 
             case GROUP_AUTHORIZATION_FAILED:
                 GroupAuthorizationException exception =
-                        
GroupAuthorizationException.forGroupId(membershipManager.groupId());
-                logger.error("GroupHeartbeatRequest failed due to group 
authorization failure: {}", exception.getMessage());
+                        
GroupAuthorizationException.forGroupId(membershipManager().groupId());
+                logger.error("{} failed due to group authorization failure: 
{}",
+                        heartbeatRequestName(), exception.getMessage());
                 handleFatalFailure(error.exception(exception.getMessage()));
                 break;
 
-            case UNRELEASED_INSTANCE_ID:
-                logger.error("GroupHeartbeatRequest failed due to unreleased 
instance id {}: {}",
-                        membershipManager.groupInstanceId().orElse("null"), 
errorMessage);
-                handleFatalFailure(error.exception(errorMessage));
-                break;
-
-            case FENCED_INSTANCE_ID:
-                logger.error("GroupHeartbeatRequest failed due to fenced 
instance id {}: {}. " +
-                        "This is expected in the case that the member was 
removed from the group " +
-                        "by an admin client, and another member joined using 
the same group instance id.",
-                    membershipManager.groupInstanceId().orElse("null"), 
errorMessage);
-                handleFatalFailure(error.exception(errorMessage));
-                break;
-
             case INVALID_REQUEST:
             case GROUP_MAX_SIZE_REACHED:
             case UNSUPPORTED_ASSIGNOR:
             case UNSUPPORTED_VERSION:
-                logger.error("GroupHeartbeatRequest failed due to {}: {}", 
error, errorMessage);
+                logger.error("{} failed due to {}: {}", 
heartbeatRequestName(), error, errorMessage);
                 handleFatalFailure(error.exception(errorMessage));
                 break;
 
             case FENCED_MEMBER_EPOCH:
-                message = String.format("GroupHeartbeatRequest failed for 
member %s because epoch %s is fenced.",
-                        membershipManager.memberId(), 
membershipManager.memberEpoch());
+                message = String.format("%s failed for member %s because epoch 
%s is fenced.",
+                        heartbeatRequestName(), 
membershipManager().memberId(), membershipManager().memberEpoch());
                 logInfo(message, response, currentTimeMs);
-                membershipManager.transitionToFenced();
+                membershipManager().transitionToFenced();
                 // Skip backoff so that a next HB to rejoin is sent as soon as 
the fenced member releases its assignment
                 heartbeatRequestState.reset();
                 break;
 
             case UNKNOWN_MEMBER_ID:
-                message = String.format("GroupHeartbeatRequest failed because 
member %s is unknown.",
-                        membershipManager.memberId());
+                message = String.format("%s failed because member %s is 
unknown.",
+                        heartbeatRequestName(), 
membershipManager().memberId());
                 logInfo(message, response, currentTimeMs);
-                membershipManager.transitionToFenced();
+                membershipManager().transitionToFenced();
                 // Skip backoff so that a next HB to rejoin is sent as soon as 
the fenced member releases its assignment
                 heartbeatRequestState.reset();
                 break;
 
             default:
                 // If the manager receives an unknown error - there could be a 
bug in the code or a new error code

Review Comment:
   should we move this comment inside the `if`? at this point we're not yet in 
the "unknown" errors land anymore



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -467,12 +441,12 @@ static class HeartbeatRequestState extends RequestState {
         private long heartbeatIntervalMs;
 
         public HeartbeatRequestState(
-            final LogContext logContext,
-            final Time time,
-            final long heartbeatIntervalMs,
-            final long retryBackoffMs,
-            final long retryBackoffMaxMs,
-            final double jitter) {
+                final LogContext logContext,
+                final Time time,
+                final long heartbeatIntervalMs,
+                final long retryBackoffMs,
+                final long retryBackoffMaxMs,
+                final double jitter) {

Review Comment:
   indentation off?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -118,49 +98,41 @@ public class HeartbeatRequestManager implements 
RequestManager {
      */
     private final HeartbeatMetricsManager metricsManager;
 
-    public HeartbeatRequestManager(
-        final LogContext logContext,
-        final Time time,
-        final ConsumerConfig config,
-        final CoordinatorRequestManager coordinatorRequestManager,
-        final SubscriptionState subscriptions,
-        final ConsumerMembershipManager membershipManager,
-        final BackgroundEventHandler backgroundEventHandler,
-        final Metrics metrics) {
+    public AbstractHeartbeatRequestManager(
+            final LogContext logContext,
+            final Time time,
+            final ConsumerConfig config,
+            final CoordinatorRequestManager coordinatorRequestManager,
+            final BackgroundEventHandler backgroundEventHandler,
+            final HeartbeatMetricsManager metricsManager) {
         this.coordinatorRequestManager = coordinatorRequestManager;
         this.logger = logContext.logger(getClass());
-        this.membershipManager = membershipManager;
         this.backgroundEventHandler = backgroundEventHandler;
         this.maxPollIntervalMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
         long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
-        this.heartbeatState = new HeartbeatState(subscriptions, 
membershipManager, maxPollIntervalMs);
         this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
-            retryBackoffMaxMs, maxPollIntervalMs);
+                retryBackoffMaxMs, maxPollIntervalMs);
         this.pollTimer = time.timer(maxPollIntervalMs);
-        this.metricsManager = new HeartbeatMetricsManager(metrics);
+        this.metricsManager = metricsManager;
     }
 
     // Visible for testing

Review Comment:
   this is probably not "Visible for testing" anymore on this abstract class, 
and should be just a "protected" constructor (same for the other one above)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to