lianetm commented on code in PR #16963:
URL: https://github.com/apache/kafka/pull/16963#discussion_r1727765532
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -467,12 +441,12 @@ static class HeartbeatRequestState extends RequestState {
private long heartbeatIntervalMs;
public HeartbeatRequestState(
Review Comment:
could this be just protected now?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java:
##########
@@ -16,123 +16,72 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
-import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
-import org.slf4j.Logger;
-
import java.util.ArrayList;
-import java.util.Collections;
import java.util.TreeSet;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
/**
* <p>Manages the request creation and response handling for the heartbeat of
a share group. The module creates a
- * {@link ShareGroupHeartbeatRequest} using the state stored in the {@link
ShareMembershipManager} and enqueues it to
- * the network queue to be sent out. Once the response is received, the module
will update the state in the
- * {@link ShareMembershipManager} and handle any errors.</p>
+ * {@link ShareGroupHeartbeatRequest} using the state stored in the {@link
ShareMembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, it updates
the state in the
+ * {@link ShareMembershipManager} and handles any errors.</p>
*
* <p>The manager will try to send a heartbeat when the member is in {@link
MemberState#STABLE},
- * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean
the member is either in a stable
+ * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}, which
means the member is either in a stable
* group, is trying to join a group, or is in the process of reconciling the
assignment changes.</p>
*
- * <p>If the member got kick out of a group, it will give up the current
assignment and join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will attempt to join again
with a zero epoch.</p>
*
- * <p>If the member does not have groupId configured or encounters fatal
exceptions, a heartbeat will not be sent.</p>
+ * <p>If the member does not have groupId configured or encountering fatal
exceptions, a heartbeat will not be sent.</p>
*
- * <p>If the coordinator is not found, we will skip sending the heartbeat and
try to find a coordinator first.</p>
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat
and try to find a coordinator first.</p>
*
- * <p>If the heartbeat failed due to retriable errors, such as,
TimeoutException, the subsequent attempt will be
- * backoff exponentially.</p>
+ * <p>If the heartbeat failed due to retriable errors, such as
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.</p>
*
* <p>When the member completes the assignment reconciliation, the {@link
HeartbeatRequestState} will be reset so
* that a heartbeat will be sent in the next event loop.</p>
*
- * <p>See {@link HeartbeatRequestState} for more details.</p>
+ * <p>See {@link AbstractHeartbeatRequestManager.HeartbeatRequestState} for
more details.</p>
*/
-public class ShareHeartbeatRequestManager implements RequestManager {
-
- private final Logger logger;
-
- /**
- * Time that the group coordinator will wait on member to revoke its
partitions. This is provided by the group
- * coordinator in the heartbeat
- */
- private final int maxPollIntervalMs;
-
- /**
- * CoordinatorRequestManager manages the connection to the group
coordinator
- */
- private final CoordinatorRequestManager coordinatorRequestManager;
+public class ShareHeartbeatRequestManager extends
AbstractHeartbeatRequestManager<ShareGroupHeartbeatResponse,
ShareGroupHeartbeatResponseData> {
/**
- * HeartbeatRequestState manages heartbeat request timing and retries
+ * Membership manager for consumer groups
*/
- private final HeartbeatRequestState heartbeatRequestState;
+ private final ShareMembershipManager membershipManager;
/*
* HeartbeatState manages building the heartbeat requests correctly
*/
private final HeartbeatState heartbeatState;
- /**
- * ShareMembershipManager manages member's essential attributes like epoch
and id, and its rebalance state
- */
- private final ShareMembershipManager shareMembershipManager;
-
- /**
- * ErrorEventHandler allows the background thread to propagate errors back
to the user
- */
- private final BackgroundEventHandler backgroundEventHandler;
-
- /**
- * Timer for tracking the time since the last consumer poll. If the timer
expires, the consumer will stop
- * sending heartbeat until the next poll.
- */
- private final Timer pollTimer;
-
- /**
- * Holding the heartbeat sensor to measure heartbeat timing and response
latency
- */
- private final HeartbeatMetricsManager metricsManager;
-
public ShareHeartbeatRequestManager(
final LogContext logContext,
final Time time,
final ConsumerConfig config,
final CoordinatorRequestManager coordinatorRequestManager,
final SubscriptionState subscriptions,
- final ShareMembershipManager shareMembershipManager,
+ final ShareMembershipManager membershipManager,
final BackgroundEventHandler backgroundEventHandler,
final Metrics metrics) {
- this.coordinatorRequestManager = coordinatorRequestManager;
- this.logger = logContext.logger(getClass());
- this.shareMembershipManager = shareMembershipManager;
- this.backgroundEventHandler = backgroundEventHandler;
- this.maxPollIntervalMs =
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
- long retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
- long retryBackoffMaxMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
- this.heartbeatState = new HeartbeatState(subscriptions,
shareMembershipManager);
- this.heartbeatRequestState = new HeartbeatRequestState(logContext,
time, 0, retryBackoffMs,
- retryBackoffMaxMs, maxPollIntervalMs);
- this.pollTimer = time.timer(maxPollIntervalMs);
- this.metricsManager = new HeartbeatMetricsManager(metrics,
CONSUMER_SHARE_METRIC_GROUP_PREFIX);
+ super(logContext, time, config, coordinatorRequestManager,
backgroundEventHandler,
+ new HeartbeatMetricsManager(metrics,
CONSUMER_SHARE_METRIC_GROUP_PREFIX));
Review Comment:
indentation off?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -375,81 +342,88 @@ private void onErrorResponse(final
ConsumerGroupHeartbeatResponse response,
case COORDINATOR_LOAD_IN_PROGRESS:
// the manager will backoff and retry
- message = String.format("GroupHeartbeatRequest failed because
the group coordinator %s is still loading." +
- "Will retry",
- coordinatorRequestManager.coordinator());
+ message = String.format("%s failed because the group
coordinator %s is still loading. Will retry",
+ heartbeatRequestName(),
coordinatorRequestManager.coordinator());
logInfo(message, response, currentTimeMs);
break;
case GROUP_AUTHORIZATION_FAILED:
GroupAuthorizationException exception =
-
GroupAuthorizationException.forGroupId(membershipManager.groupId());
- logger.error("GroupHeartbeatRequest failed due to group
authorization failure: {}", exception.getMessage());
+
GroupAuthorizationException.forGroupId(membershipManager().groupId());
+ logger.error("{} failed due to group authorization failure:
{}",
+ heartbeatRequestName(), exception.getMessage());
handleFatalFailure(error.exception(exception.getMessage()));
break;
- case UNRELEASED_INSTANCE_ID:
- logger.error("GroupHeartbeatRequest failed due to unreleased
instance id {}: {}",
- membershipManager.groupInstanceId().orElse("null"),
errorMessage);
- handleFatalFailure(error.exception(errorMessage));
- break;
-
- case FENCED_INSTANCE_ID:
- logger.error("GroupHeartbeatRequest failed due to fenced
instance id {}: {}. " +
- "This is expected in the case that the member was
removed from the group " +
- "by an admin client, and another member joined using
the same group instance id.",
- membershipManager.groupInstanceId().orElse("null"),
errorMessage);
- handleFatalFailure(error.exception(errorMessage));
- break;
-
case INVALID_REQUEST:
case GROUP_MAX_SIZE_REACHED:
case UNSUPPORTED_ASSIGNOR:
case UNSUPPORTED_VERSION:
- logger.error("GroupHeartbeatRequest failed due to {}: {}",
error, errorMessage);
+ logger.error("{} failed due to {}: {}",
heartbeatRequestName(), error, errorMessage);
handleFatalFailure(error.exception(errorMessage));
break;
case FENCED_MEMBER_EPOCH:
- message = String.format("GroupHeartbeatRequest failed for
member %s because epoch %s is fenced.",
- membershipManager.memberId(),
membershipManager.memberEpoch());
+ message = String.format("%s failed for member %s because epoch
%s is fenced.",
+ heartbeatRequestName(),
membershipManager().memberId(), membershipManager().memberEpoch());
logInfo(message, response, currentTimeMs);
- membershipManager.transitionToFenced();
+ membershipManager().transitionToFenced();
// Skip backoff so that a next HB to rejoin is sent as soon as
the fenced member releases its assignment
heartbeatRequestState.reset();
break;
case UNKNOWN_MEMBER_ID:
- message = String.format("GroupHeartbeatRequest failed because
member %s is unknown.",
- membershipManager.memberId());
+ message = String.format("%s failed because member %s is
unknown.",
+ heartbeatRequestName(),
membershipManager().memberId());
logInfo(message, response, currentTimeMs);
- membershipManager.transitionToFenced();
+ membershipManager().transitionToFenced();
// Skip backoff so that a next HB to rejoin is sent as soon as
the fenced member releases its assignment
heartbeatRequestState.reset();
break;
default:
// If the manager receives an unknown error - there could be a
bug in the code or a new error code
- logger.error("GroupHeartbeatRequest failed due to unexpected
error {}: {}", error, errorMessage);
- handleFatalFailure(error.exception(errorMessage));
+ if (!handleSpecificError(response, currentTimeMs)) {
+ logger.error("{} failed due to unexpected error {}: {}",
heartbeatRequestName(), error, errorMessage);
+ handleFatalFailure(error.exception(errorMessage));
+ }
break;
}
}
- private void logInfo(final String message,
- final ConsumerGroupHeartbeatResponse response,
+ protected void logInfo(final String message,
+ final HBR response,
final long currentTimeMs) {
logger.info("{} in {}ms: {}",
- message,
- heartbeatRequestState.remainingBackoffMs(currentTimeMs),
- response.data().errorMessage());
+ message,
+ heartbeatRequestState.remainingBackoffMs(currentTimeMs),
+ errorMessageForResponse(response));
Review Comment:
indentation with 8 instead of 4 continuation that we had unintentionally in
the previous PR, still unintentional here?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -18,90 +18,70 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
/**
* <p>Manages the request creation and response handling for the heartbeat.
The module creates a
- * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link
ConsumerMembershipManager} and enqueue it to
- * the network queue to be sent out. Once the response is received, the module
will update the state in the
- * {@link ConsumerMembershipManager} and handle any errors.</p>
+ * heartbeat request using the state stored in the membership manager and
enqueues it to
+ * the network queue to be sent out. Once the response is received, it updates
the state in the
+ * membership manager and handles any errors.
*
* <p>The manager will try to send a heartbeat when the member is in {@link
MemberState#STABLE},
* {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean
the member is either in a stable
- * group, is trying to join a group, or is in the process of reconciling the
assignment changes.</p>
+ * group, is trying to join a group, or is in the process of reconciling the
assignment changes.
*
- * <p>If the member got kick out of a group, it will try to give up the
current assignment by invoking {@code
- * OnPartitionsLost} because reattempting to join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will try to give up the
current assignment by invoking {@code
+ * OnPartitionsLost} before attempting to join again with a zero epoch.
*
- * <p>If the member does not have groupId configured or encountering fatal
exceptions, a heartbeat will not be sent.</p>
+ * <p>If the member does not have groupId configured or encountering fatal
exceptions, a heartbeat will not be sent.
*
- * <p>If the coordinator not is not found, we will skip sending the heartbeat
and try to find a coordinator first.</p>
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat
and try to find a coordinator first.
*
- * <p>If the heartbeat failed due to retriable errors, such as,
TimeoutException. The subsequent attempt will be
- * backoff exponentially.</p>
+ * <p>If the heartbeat failed due to retriable errors, such as
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.
*
* <p>When the member completes the assignment reconciliation, the {@link
HeartbeatRequestState} will be reset so
Review Comment:
uhm this is not accurate, when reconciliation completes there is no reset
(that would lead to a full HB). It's just a next HB including the reconciled
partitions (I would just remove it)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -18,90 +18,70 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
/**
* <p>Manages the request creation and response handling for the heartbeat.
The module creates a
- * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link
ConsumerMembershipManager} and enqueue it to
- * the network queue to be sent out. Once the response is received, the module
will update the state in the
- * {@link ConsumerMembershipManager} and handle any errors.</p>
+ * heartbeat request using the state stored in the membership manager and
enqueues it to
+ * the network queue to be sent out. Once the response is received, it updates
the state in the
+ * membership manager and handles any errors.
*
* <p>The manager will try to send a heartbeat when the member is in {@link
MemberState#STABLE},
* {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean
the member is either in a stable
- * group, is trying to join a group, or is in the process of reconciling the
assignment changes.</p>
+ * group, is trying to join a group, or is in the process of reconciling the
assignment changes.
*
- * <p>If the member got kick out of a group, it will try to give up the
current assignment by invoking {@code
- * OnPartitionsLost} because reattempting to join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will try to give up the
current assignment by invoking {@code
+ * OnPartitionsLost} before attempting to join again with a zero epoch.
*
- * <p>If the member does not have groupId configured or encountering fatal
exceptions, a heartbeat will not be sent.</p>
+ * <p>If the member does not have groupId configured or encountering fatal
exceptions, a heartbeat will not be sent.
*
- * <p>If the coordinator not is not found, we will skip sending the heartbeat
and try to find a coordinator first.</p>
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat
and try to find a coordinator first.
*
- * <p>If the heartbeat failed due to retriable errors, such as,
TimeoutException. The subsequent attempt will be
- * backoff exponentially.</p>
+ * <p>If the heartbeat failed due to retriable errors, such as
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.
*
* <p>When the member completes the assignment reconciliation, the {@link
HeartbeatRequestState} will be reset so
- * that a heartbeat will be sent in the next event loop.</p>
+ * that a heartbeat will be sent in the next event loop.
*
- * <p>See {@link HeartbeatRequestState} for more details.</p>
+ * <p>The class variable HBR is the response for the specific group's
heartbeat RPC.
+ * <p>The class variable HRBD is the response data for the specific group's
heartbeat RPC.
*/
-public class HeartbeatRequestManager implements RequestManager {
+public abstract class AbstractHeartbeatRequestManager<HBR extends
AbstractResponse, HBRD extends ApiMessage> implements RequestManager {
Review Comment:
nit: What about simplifying the generic names to R (response) and
D(response data)? We already have the HB flavour at the class name level
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java:
##########
@@ -16,123 +16,72 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
-import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
-import org.slf4j.Logger;
-
import java.util.ArrayList;
-import java.util.Collections;
import java.util.TreeSet;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
/**
* <p>Manages the request creation and response handling for the heartbeat of
a share group. The module creates a
- * {@link ShareGroupHeartbeatRequest} using the state stored in the {@link
ShareMembershipManager} and enqueues it to
- * the network queue to be sent out. Once the response is received, the module
will update the state in the
- * {@link ShareMembershipManager} and handle any errors.</p>
+ * {@link ShareGroupHeartbeatRequest} using the state stored in the {@link
ShareMembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, it updates
the state in the
+ * {@link ShareMembershipManager} and handles any errors.</p>
*
* <p>The manager will try to send a heartbeat when the member is in {@link
MemberState#STABLE},
- * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean
the member is either in a stable
+ * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}, which
means the member is either in a stable
* group, is trying to join a group, or is in the process of reconciling the
assignment changes.</p>
*
- * <p>If the member got kick out of a group, it will give up the current
assignment and join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will attempt to join again
with a zero epoch.</p>
*
- * <p>If the member does not have groupId configured or encounters fatal
exceptions, a heartbeat will not be sent.</p>
+ * <p>If the member does not have groupId configured or encountering fatal
exceptions, a heartbeat will not be sent.</p>
*
- * <p>If the coordinator is not found, we will skip sending the heartbeat and
try to find a coordinator first.</p>
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat
and try to find a coordinator first.</p>
*
- * <p>If the heartbeat failed due to retriable errors, such as,
TimeoutException, the subsequent attempt will be
- * backoff exponentially.</p>
+ * <p>If the heartbeat failed due to retriable errors, such as
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.</p>
*
* <p>When the member completes the assignment reconciliation, the {@link
HeartbeatRequestState} will be reset so
* that a heartbeat will be sent in the next event loop.</p>
*
- * <p>See {@link HeartbeatRequestState} for more details.</p>
+ * <p>See {@link AbstractHeartbeatRequestManager.HeartbeatRequestState} for
more details.</p>
Review Comment:
similar to my comment on the ConsumerHeartbeatRequestManager java doc, I
would suggest we keep at this level only the info that is specific to this
manager (basically that it's based on ShareGroupHeartbeatRequest and
ShareGroupHeartbeatResponse), and then refer to the base class java doc for the
common info that applies to all.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java:
##########
@@ -194,6 +194,7 @@ public ConsumerMembershipManager(String groupId,
* @return Instance ID used by the member when joining the group. If
non-empty, it will indicate that
* this is a static member.
*/
Review Comment:
{@inheritDoc} better to avoid maintaining both?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * <p>Manages the request creation and response handling for the heartbeat of
a consumer group. The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link
ConsumerMembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, it updates
the state in the
+ * {@link ConsumerMembershipManager} and handles any errors.</p>
+ *
+ * <p>The manager will try to send a heartbeat when the member is in {@link
MemberState#STABLE},
+ * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}, which
means the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the
assignment changes.</p>
+ *
+ * <p>If the member got kicked out of a group, it will try to give up the
current assignment by invoking {@code
+ * OnPartitionsLost} before attempting to join again with a zero epoch.</p>
+ *
+ * <p>If the member does not have groupId configured or encountering fatal
exceptions, a heartbeat will not be sent.</p>
+ *
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat
and try to find a coordinator first.</p>
+ *
+ * <p>If the heartbeat failed due to retriable errors, such as
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.</p>
+ *
+ * <p>When the member completes the assignment reconciliation, the {@link
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.</p>
+ *
+ * <p>See {@link AbstractHeartbeatRequestManager.HeartbeatRequestState} for
more details.</p>
Review Comment:
this is kind of duplicating the java doc on the base abstract manager and
will be a nightmare to maintain (I left comments on the base class already).
What about we just leave here what's specific to this manager: "Manages the
request creation and response handling for the heartbeat of a consumer group,
generating ConsumerGroupHeartbeatRequest and processing
ConsumerGroupHeartbeatResponse" (along those lines). And then maybe just {@see
AbstractHeartbeatRequestManager}. Better?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -18,90 +18,70 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
/**
* <p>Manages the request creation and response handling for the heartbeat.
The module creates a
- * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link
ConsumerMembershipManager} and enqueue it to
- * the network queue to be sent out. Once the response is received, the module
will update the state in the
- * {@link ConsumerMembershipManager} and handle any errors.</p>
+ * heartbeat request using the state stored in the membership manager and
enqueues it to
+ * the network queue to be sent out. Once the response is received, it updates
the state in the
+ * membership manager and handles any errors.
*
* <p>The manager will try to send a heartbeat when the member is in {@link
MemberState#STABLE},
* {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean
the member is either in a stable
- * group, is trying to join a group, or is in the process of reconciling the
assignment changes.</p>
+ * group, is trying to join a group, or is in the process of reconciling the
assignment changes.
*
- * <p>If the member got kick out of a group, it will try to give up the
current assignment by invoking {@code
- * OnPartitionsLost} because reattempting to join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will try to give up the
current assignment by invoking {@code
+ * OnPartitionsLost} before attempting to join again with a zero epoch.
Review Comment:
this is really not the responsibility of the HB manager (it's all in the
membership mgr). I would say we just remove these 2 lines?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -18,90 +18,70 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
/**
* <p>Manages the request creation and response handling for the heartbeat.
The module creates a
- * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link
ConsumerMembershipManager} and enqueue it to
- * the network queue to be sent out. Once the response is received, the module
will update the state in the
- * {@link ConsumerMembershipManager} and handle any errors.</p>
+ * heartbeat request using the state stored in the membership manager and
enqueues it to
+ * the network queue to be sent out. Once the response is received, it updates
the state in the
+ * membership manager and handles any errors.
*
* <p>The manager will try to send a heartbeat when the member is in {@link
MemberState#STABLE},
* {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean
the member is either in a stable
- * group, is trying to join a group, or is in the process of reconciling the
assignment changes.</p>
+ * group, is trying to join a group, or is in the process of reconciling the
assignment changes.
Review Comment:
this is not really accurate, it's just probably outdated (other states that
send HB were probably added after). Instead of referring to the specific
states, what about we just describe the basics, something like: the HB manager
generates HB requests based on the member state, that indicates if HB should be
sent. It's also responsible for timing the requests, to ensure they are sent on
the interval (ex. while member is stable in a group), or on demand (ex. to
acknowledge a reconciled assignment or leave the group)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -275,98 +242,98 @@ private NetworkClientDelegate.UnsentRequest
makeHeartbeatRequest(final long curr
final
boolean ignoreResponse) {
NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequest(ignoreResponse);
heartbeatRequestState.onSendAttempt(currentTimeMs);
- membershipManager.onHeartbeatRequestGenerated();
+ membershipManager().onHeartbeatRequestGenerated();
metricsManager.recordHeartbeatSentMs(currentTimeMs);
heartbeatRequestState.resetTimer();
return request;
}
+ @SuppressWarnings("unchecked")
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final
boolean ignoreResponse) {
- NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
- new
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
- coordinatorRequestManager.coordinator());
+ NetworkClientDelegate.UnsentRequest request = buildHeartbeatRequest();
if (ignoreResponse)
return logResponse(request);
else
return request.whenComplete((response, exception) -> {
long completionTimeMs = request.handler().completionTimeMs();
if (response != null) {
metricsManager.recordRequestLatency(response.requestLatencyMs());
- onResponse((ConsumerGroupHeartbeatResponse)
response.responseBody(), completionTimeMs);
+ onResponse((HBR) response.responseBody(),
completionTimeMs);
} else {
onFailure(exception, completionTimeMs);
}
});
}
+ @SuppressWarnings("unchecked")
private NetworkClientDelegate.UnsentRequest logResponse(final
NetworkClientDelegate.UnsentRequest request) {
return request.whenComplete((response, exception) -> {
if (response != null) {
metricsManager.recordRequestLatency(response.requestLatencyMs());
Errors error =
- Errors.forCode(((ConsumerGroupHeartbeatResponse)
response.responseBody()).data().errorCode());
+ Errors.forCode(errorCodeForResponse((HBR)
response.responseBody()));
if (error == Errors.NONE)
- logger.debug("GroupHeartbeat responded successfully: {}",
response);
+ logger.debug("{} responded successfully: {}",
heartbeatRequestName(), response);
else
- logger.error("GroupHeartbeat failed because of {}: {}",
error, response);
+ logger.error("{} failed because of {}: {}",
heartbeatRequestName(), error, response);
} else {
- logger.error("GroupHeartbeat failed because of unexpected
exception.", exception);
+ logger.error("{} failed because of unexpected exception.",
heartbeatRequestName(), exception);
}
});
}
private void onFailure(final Throwable exception, final long
responseTimeMs) {
this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
- this.heartbeatState.reset();
- membershipManager.onHeartbeatFailure(exception instanceof
RetriableException);
+ resetHeartbeatState();
+ membershipManager().onHeartbeatFailure(exception instanceof
RetriableException);
if (exception instanceof RetriableException) {
- String message = String.format("GroupHeartbeatRequest failed
because of the retriable exception. " +
- "Will retry in %s ms: %s",
- heartbeatRequestState.remainingBackoffMs(responseTimeMs),
- exception.getMessage());
+ String message = String.format("%s failed because of the retriable
exception. Will retry in %s ms: %s",
+ heartbeatRequestName(),
+ heartbeatRequestState.remainingBackoffMs(responseTimeMs),
+ exception.getMessage());
logger.debug(message);
} else {
- logger.error("GroupHeartbeatRequest failed due to fatal error: " +
exception.getMessage());
+ logger.error("{} failed due to fatal error: {}",
heartbeatRequestName(), exception.getMessage());
handleFatalFailure(exception);
}
}
- private void onResponse(final ConsumerGroupHeartbeatResponse response,
long currentTimeMs) {
- if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
-
heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
+ private void onResponse(final HBR response, long currentTimeMs) {
+ if (Errors.forCode(errorCodeForResponse(response)) == Errors.NONE) {
+
heartbeatRequestState.updateHeartbeatIntervalMs(heartbeatIntervalForResponse(response));
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
- membershipManager.onHeartbeatSuccess(response.data());
+ membershipManager().onHeartbeatSuccess(responseData(response));
return;
}
onErrorResponse(response, currentTimeMs);
}
- private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
+ private void onErrorResponse(final HBR response,
final long currentTimeMs) {
- Errors error = Errors.forCode(response.data().errorCode());
- String errorMessage = response.data().errorMessage();
+ Errors error = Errors.forCode(errorCodeForResponse(response));
Review Comment:
seems we always end up wrapping the result of `errorCodeForResponse` with
Errors.forCode? If so we could simplify maybe by having errorCodeForResponse
return an Error already and then just `Errors error =
errorCodeForResponse(response);`
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * <p>Manages the request creation and response handling for the heartbeat of
a consumer group. The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link
ConsumerMembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, it updates
the state in the
+ * {@link ConsumerMembershipManager} and handles any errors.</p>
+ *
+ * <p>The manager will try to send a heartbeat when the member is in {@link
MemberState#STABLE},
+ * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}, which
means the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the
assignment changes.</p>
+ *
+ * <p>If the member got kicked out of a group, it will try to give up the
current assignment by invoking {@code
+ * OnPartitionsLost} before attempting to join again with a zero epoch.</p>
+ *
+ * <p>If the member does not have groupId configured or encountering fatal
exceptions, a heartbeat will not be sent.</p>
+ *
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat
and try to find a coordinator first.</p>
+ *
+ * <p>If the heartbeat failed due to retriable errors, such as
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.</p>
+ *
+ * <p>When the member completes the assignment reconciliation, the {@link
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.</p>
+ *
+ * <p>See {@link AbstractHeartbeatRequestManager.HeartbeatRequestState} for
more details.</p>
+ */
+public class ConsumerHeartbeatRequestManager extends
AbstractHeartbeatRequestManager<ConsumerGroupHeartbeatResponse,
ConsumerGroupHeartbeatResponseData> {
+
+ /**
+ * Membership manager for consumer groups
+ */
+ private final ConsumerMembershipManager membershipManager;
+
+ /**
+ * HeartbeatState manages building the heartbeat requests correctly
+ */
+ private final HeartbeatState heartbeatState;
+
+ public ConsumerHeartbeatRequestManager(
+ final LogContext logContext,
+ final Time time,
+ final ConsumerConfig config,
+ final CoordinatorRequestManager coordinatorRequestManager,
+ final SubscriptionState subscriptions,
+ final ConsumerMembershipManager membershipManager,
+ final BackgroundEventHandler backgroundEventHandler,
+ final Metrics metrics) {
+ super(logContext, time, config, coordinatorRequestManager,
backgroundEventHandler,
+ new HeartbeatMetricsManager(metrics));
+ this.membershipManager = membershipManager;
+ this.heartbeatState = new HeartbeatState(subscriptions,
membershipManager, maxPollIntervalMs);
+ }
+
+ // Visible for testing
+ ConsumerHeartbeatRequestManager(
+ final LogContext logContext,
+ final Timer timer,
+ final ConsumerConfig config,
+ final CoordinatorRequestManager coordinatorRequestManager,
+ final ConsumerMembershipManager membershipManager,
+ final HeartbeatState heartbeatState,
+ final AbstractHeartbeatRequestManager.HeartbeatRequestState
heartbeatRequestState,
+ final BackgroundEventHandler backgroundEventHandler,
+ final Metrics metrics) {
+ super(logContext, timer, config, coordinatorRequestManager,
heartbeatRequestState, backgroundEventHandler,
+ new HeartbeatMetricsManager(metrics));
+ this.membershipManager = membershipManager;
+ this.heartbeatState = heartbeatState;
+ }
+
+ public boolean handleSpecificError(final ConsumerGroupHeartbeatResponse
response,
+ final long currentTimeMs) {
+ Errors error = Errors.forCode(errorCodeForResponse(response));
+ String errorMessage = errorMessageForResponse(response);
+ boolean errorHandled;
+
+ switch (error) {
+ case UNRELEASED_INSTANCE_ID:
+ logger.error("{} failed due to unreleased instance id {}: {}",
+ heartbeatRequestName(),
membershipManager.groupInstanceId().orElse("null"), errorMessage);
+ handleFatalFailure(error.exception(errorMessage));
+ errorHandled = true;
+ break;
+
+ case FENCED_INSTANCE_ID:
+ logger.error("{} failed due to fenced instance id {}: {}. " +
+ "This is expected in the case that the member
was removed from the group " +
+ "by an admin client, and another member joined
using the same group instance id.",
Review Comment:
indentation off
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -264,9 +231,9 @@ public void resetPollTimer(final long pollMs) {
pollTimer.update(pollMs);
if (pollTimer.isExpired()) {
logger.warn("Time between subsequent calls to poll() was longer
than the configured " +
- "max.poll.interval.ms, exceeded approximately by {} ms. Member
{} will rejoin the group now.",
- pollTimer.isExpiredBy(), membershipManager.memberId());
- membershipManager.maybeRejoinStaleMember();
+ "max.poll.interval.ms, exceeded approximately by
{} ms. Member {} will rejoin the group now.",
Review Comment:
indentation off?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java:
##########
@@ -16,123 +16,72 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
-import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
-import org.slf4j.Logger;
-
import java.util.ArrayList;
-import java.util.Collections;
import java.util.TreeSet;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
/**
* <p>Manages the request creation and response handling for the heartbeat of
a share group. The module creates a
- * {@link ShareGroupHeartbeatRequest} using the state stored in the {@link
ShareMembershipManager} and enqueues it to
- * the network queue to be sent out. Once the response is received, the module
will update the state in the
- * {@link ShareMembershipManager} and handle any errors.</p>
+ * {@link ShareGroupHeartbeatRequest} using the state stored in the {@link
ShareMembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, it updates
the state in the
+ * {@link ShareMembershipManager} and handles any errors.</p>
*
* <p>The manager will try to send a heartbeat when the member is in {@link
MemberState#STABLE},
- * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean
the member is either in a stable
+ * {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}, which
means the member is either in a stable
* group, is trying to join a group, or is in the process of reconciling the
assignment changes.</p>
*
- * <p>If the member got kick out of a group, it will give up the current
assignment and join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will attempt to join again
with a zero epoch.</p>
*
- * <p>If the member does not have groupId configured or encounters fatal
exceptions, a heartbeat will not be sent.</p>
+ * <p>If the member does not have groupId configured or encountering fatal
exceptions, a heartbeat will not be sent.</p>
*
- * <p>If the coordinator is not found, we will skip sending the heartbeat and
try to find a coordinator first.</p>
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat
and try to find a coordinator first.</p>
*
- * <p>If the heartbeat failed due to retriable errors, such as,
TimeoutException, the subsequent attempt will be
- * backoff exponentially.</p>
+ * <p>If the heartbeat failed due to retriable errors, such as
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.</p>
*
* <p>When the member completes the assignment reconciliation, the {@link
HeartbeatRequestState} will be reset so
* that a heartbeat will be sent in the next event loop.</p>
*
- * <p>See {@link HeartbeatRequestState} for more details.</p>
+ * <p>See {@link AbstractHeartbeatRequestManager.HeartbeatRequestState} for
more details.</p>
*/
-public class ShareHeartbeatRequestManager implements RequestManager {
-
- private final Logger logger;
-
- /**
- * Time that the group coordinator will wait on member to revoke its
partitions. This is provided by the group
- * coordinator in the heartbeat
- */
- private final int maxPollIntervalMs;
-
- /**
- * CoordinatorRequestManager manages the connection to the group
coordinator
- */
- private final CoordinatorRequestManager coordinatorRequestManager;
+public class ShareHeartbeatRequestManager extends
AbstractHeartbeatRequestManager<ShareGroupHeartbeatResponse,
ShareGroupHeartbeatResponseData> {
/**
- * HeartbeatRequestState manages heartbeat request timing and retries
+ * Membership manager for consumer groups
Review Comment:
for share groups
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -18,90 +18,70 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
-import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
-import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
/**
* <p>Manages the request creation and response handling for the heartbeat.
The module creates a
- * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link
ConsumerMembershipManager} and enqueue it to
- * the network queue to be sent out. Once the response is received, the module
will update the state in the
- * {@link ConsumerMembershipManager} and handle any errors.</p>
+ * heartbeat request using the state stored in the membership manager and
enqueues it to
+ * the network queue to be sent out. Once the response is received, it updates
the state in the
+ * membership manager and handles any errors.
*
* <p>The manager will try to send a heartbeat when the member is in {@link
MemberState#STABLE},
* {@link MemberState#JOINING}, or {@link MemberState#RECONCILING}. Which mean
the member is either in a stable
- * group, is trying to join a group, or is in the process of reconciling the
assignment changes.</p>
+ * group, is trying to join a group, or is in the process of reconciling the
assignment changes.
*
- * <p>If the member got kick out of a group, it will try to give up the
current assignment by invoking {@code
- * OnPartitionsLost} because reattempting to join again with a zero epoch.</p>
+ * <p>If the member got kicked out of a group, it will try to give up the
current assignment by invoking {@code
+ * OnPartitionsLost} before attempting to join again with a zero epoch.
*
- * <p>If the member does not have groupId configured or encountering fatal
exceptions, a heartbeat will not be sent.</p>
+ * <p>If the member does not have groupId configured or encountering fatal
exceptions, a heartbeat will not be sent.
*
- * <p>If the coordinator not is not found, we will skip sending the heartbeat
and try to find a coordinator first.</p>
+ * <p>If the coordinator not is not found, we will skip sending the heartbeat
and try to find a coordinator first.
*
- * <p>If the heartbeat failed due to retriable errors, such as,
TimeoutException. The subsequent attempt will be
- * backoff exponentially.</p>
+ * <p>If the heartbeat failed due to retriable errors, such as
TimeoutException, the subsequent attempt will be
+ * backed off exponentially.
*
* <p>When the member completes the assignment reconciliation, the {@link
HeartbeatRequestState} will be reset so
- * that a heartbeat will be sent in the next event loop.</p>
+ * that a heartbeat will be sent in the next event loop.
*
- * <p>See {@link HeartbeatRequestState} for more details.</p>
+ * <p>The class variable HBR is the response for the specific group's
heartbeat RPC.
+ * <p>The class variable HRBD is the response data for the specific group's
heartbeat RPC.
*/
-public class HeartbeatRequestManager implements RequestManager {
+public abstract class AbstractHeartbeatRequestManager<HBR extends
AbstractResponse, HBRD extends ApiMessage> implements RequestManager {
Review Comment:
It feels a bit redundant to have the response and responseData (when the
data is already part of the response). Tracking that down, I see that it's
needed just because `membershipMgr.onHeartbeatSuccess` requires the data.
That decision made sense at the beginning, but with this new structure I
find that it brings more complexity than the value it has. What do you think
about changing the `membershipMgr.onHeartbeatSuccess` to take the response as
parameter? Then we could simplify this class to
`AbstractHeartbeatRequestManager<R extends AbstractResponse>`
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -375,81 +342,88 @@ private void onErrorResponse(final
ConsumerGroupHeartbeatResponse response,
case COORDINATOR_LOAD_IN_PROGRESS:
// the manager will backoff and retry
- message = String.format("GroupHeartbeatRequest failed because
the group coordinator %s is still loading." +
- "Will retry",
- coordinatorRequestManager.coordinator());
+ message = String.format("%s failed because the group
coordinator %s is still loading. Will retry",
+ heartbeatRequestName(),
coordinatorRequestManager.coordinator());
logInfo(message, response, currentTimeMs);
break;
case GROUP_AUTHORIZATION_FAILED:
GroupAuthorizationException exception =
-
GroupAuthorizationException.forGroupId(membershipManager.groupId());
- logger.error("GroupHeartbeatRequest failed due to group
authorization failure: {}", exception.getMessage());
+
GroupAuthorizationException.forGroupId(membershipManager().groupId());
+ logger.error("{} failed due to group authorization failure:
{}",
+ heartbeatRequestName(), exception.getMessage());
handleFatalFailure(error.exception(exception.getMessage()));
break;
- case UNRELEASED_INSTANCE_ID:
- logger.error("GroupHeartbeatRequest failed due to unreleased
instance id {}: {}",
- membershipManager.groupInstanceId().orElse("null"),
errorMessage);
- handleFatalFailure(error.exception(errorMessage));
- break;
-
- case FENCED_INSTANCE_ID:
- logger.error("GroupHeartbeatRequest failed due to fenced
instance id {}: {}. " +
- "This is expected in the case that the member was
removed from the group " +
- "by an admin client, and another member joined using
the same group instance id.",
- membershipManager.groupInstanceId().orElse("null"),
errorMessage);
- handleFatalFailure(error.exception(errorMessage));
- break;
-
case INVALID_REQUEST:
case GROUP_MAX_SIZE_REACHED:
case UNSUPPORTED_ASSIGNOR:
case UNSUPPORTED_VERSION:
- logger.error("GroupHeartbeatRequest failed due to {}: {}",
error, errorMessage);
+ logger.error("{} failed due to {}: {}",
heartbeatRequestName(), error, errorMessage);
handleFatalFailure(error.exception(errorMessage));
break;
case FENCED_MEMBER_EPOCH:
- message = String.format("GroupHeartbeatRequest failed for
member %s because epoch %s is fenced.",
- membershipManager.memberId(),
membershipManager.memberEpoch());
+ message = String.format("%s failed for member %s because epoch
%s is fenced.",
+ heartbeatRequestName(),
membershipManager().memberId(), membershipManager().memberEpoch());
logInfo(message, response, currentTimeMs);
- membershipManager.transitionToFenced();
+ membershipManager().transitionToFenced();
// Skip backoff so that a next HB to rejoin is sent as soon as
the fenced member releases its assignment
heartbeatRequestState.reset();
break;
case UNKNOWN_MEMBER_ID:
- message = String.format("GroupHeartbeatRequest failed because
member %s is unknown.",
- membershipManager.memberId());
+ message = String.format("%s failed because member %s is
unknown.",
+ heartbeatRequestName(),
membershipManager().memberId());
logInfo(message, response, currentTimeMs);
- membershipManager.transitionToFenced();
+ membershipManager().transitionToFenced();
// Skip backoff so that a next HB to rejoin is sent as soon as
the fenced member releases its assignment
heartbeatRequestState.reset();
break;
default:
// If the manager receives an unknown error - there could be a
bug in the code or a new error code
Review Comment:
should we move this comment inside the `if`? at this point we're not yet in
the "unknown" errors land anymore
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -467,12 +441,12 @@ static class HeartbeatRequestState extends RequestState {
private long heartbeatIntervalMs;
public HeartbeatRequestState(
- final LogContext logContext,
- final Time time,
- final long heartbeatIntervalMs,
- final long retryBackoffMs,
- final long retryBackoffMaxMs,
- final double jitter) {
+ final LogContext logContext,
+ final Time time,
+ final long heartbeatIntervalMs,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs,
+ final double jitter) {
Review Comment:
indentation off?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -118,49 +98,41 @@ public class HeartbeatRequestManager implements
RequestManager {
*/
private final HeartbeatMetricsManager metricsManager;
- public HeartbeatRequestManager(
- final LogContext logContext,
- final Time time,
- final ConsumerConfig config,
- final CoordinatorRequestManager coordinatorRequestManager,
- final SubscriptionState subscriptions,
- final ConsumerMembershipManager membershipManager,
- final BackgroundEventHandler backgroundEventHandler,
- final Metrics metrics) {
+ public AbstractHeartbeatRequestManager(
+ final LogContext logContext,
+ final Time time,
+ final ConsumerConfig config,
+ final CoordinatorRequestManager coordinatorRequestManager,
+ final BackgroundEventHandler backgroundEventHandler,
+ final HeartbeatMetricsManager metricsManager) {
this.coordinatorRequestManager = coordinatorRequestManager;
this.logger = logContext.logger(getClass());
- this.membershipManager = membershipManager;
this.backgroundEventHandler = backgroundEventHandler;
this.maxPollIntervalMs =
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
long retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
- this.heartbeatState = new HeartbeatState(subscriptions,
membershipManager, maxPollIntervalMs);
this.heartbeatRequestState = new HeartbeatRequestState(logContext,
time, 0, retryBackoffMs,
- retryBackoffMaxMs, maxPollIntervalMs);
+ retryBackoffMaxMs, maxPollIntervalMs);
this.pollTimer = time.timer(maxPollIntervalMs);
- this.metricsManager = new HeartbeatMetricsManager(metrics);
+ this.metricsManager = metricsManager;
}
// Visible for testing
Review Comment:
this is probably not "Visible for testing" anymore on this abstract class,
and should be just a "protected" constructor (same for the other one above)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]