This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 1b9e107 KAFKA-7853: Refactor coordinator config (#6854) 1b9e107 is described below commit 1b9e1073885951697f34950a1ea706c93826e871 Author: Boyang Chen <boy...@confluent.io> AuthorDate: Mon Jun 17 10:58:43 2019 -0700 KAFKA-7853: Refactor coordinator config (#6854) An attempt to refactor current coordinator logic. Reviewers: Stanislav Kozlovski <stanislav_kozlov...@outlook.com>, Konstantine Karantasis <konstant...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../apache/kafka/clients/CommonClientConfigs.java | 38 +++++++- .../apache/kafka/clients/GroupRebalanceConfig.java | 100 +++++++++++++++++++ .../kafka/clients/consumer/ConsumerConfig.java | 37 ++----- .../kafka/clients/consumer/KafkaConsumer.java | 29 ++---- .../consumer/internals/AbstractCoordinator.java | 106 ++++++++------------- .../consumer/internals/ConsumerCoordinator.java | 51 ++++------ .../clients/consumer/internals/Heartbeat.java | 37 +++---- .../kafka/clients/CommonClientConfigsTest.java | 2 +- .../kafka/clients/consumer/KafkaConsumerTest.java | 42 ++++---- .../internals/AbstractCoordinatorTest.java | 34 +++++-- .../internals/ConsumerCoordinatorTest.java | 96 ++++++++++++------- .../clients/consumer/internals/HeartbeatTest.java | 21 +++- .../runtime/distributed/DistributedConfig.java | 13 +-- .../runtime/distributed/WorkerCoordinator.java | 22 ++--- .../runtime/distributed/WorkerGroupMember.java | 9 +- .../WorkerCoordinatorIncrementalTest.java | 37 +++---- .../runtime/distributed/WorkerCoordinatorTest.java | 38 ++++---- 17 files changed, 409 insertions(+), 303 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 49465dc..d8428a7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -26,7 +26,7 @@ import java.util.HashMap; import java.util.Map; /** - * Some configurations shared by both producer and consumer + * Configurations shared by Kafka client applications: producer, consumer, connect, etc. */ public class CommonClientConfigs { private static final Logger log = LoggerFactory.getLogger(CommonClientConfigs.class); @@ -101,6 +101,42 @@ public class CommonClientConfigs { + "elapses the client will resend the request if necessary or fail the request if " + "retries are exhausted."; + public static final String GROUP_ID_CONFIG = "group.id"; + public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy."; + + public static final String GROUP_INSTANCE_ID_CONFIG = "group.instance.id"; + public static final String GROUP_INSTANCE_ID_DOC = "A unique identifier of the consumer instance provided by end user. " + + "Only non-empty strings are permitted. If set, the consumer is treated as a static member, " + + "which means that only one instance with this ID is allowed in the consumer group at any time. " + + "This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability " + + "(e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior."; + + public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms"; + public static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using " + + "consumer group management. This places an upper bound on the amount of time that the consumer can be idle " + + "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer " + + "is considered failed and the group will rebalance in order to reassign the partitions to another member. "; + + public static final String REBALANCE_TIMEOUT_MS_CONFIG = "rebalance.timeout.ms"; + public static final String REBALANCE_TIMEOUT_MS_DOC = "The maximum allowed time for each worker to join the group " + + "once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to " + + "flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed " + + "from the group, which will cause offset commit failures."; + + public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms"; + public static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect client failures when using " + + "Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness " + + "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, " + + "then the broker will remove this client from the group and initiate a rebalance. Note that the value " + + "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> " + + "and <code>group.max.session.timeout.ms</code>."; + + public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms"; + public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer " + + "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " + + "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. " + + "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher " + + "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances."; /** * Postprocess the configuration so that exponential backoff is disabled when reconnect backoff diff --git a/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java new file mode 100644 index 0000000..006800a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.requests.JoinGroupRequest; + +import java.util.Locale; +import java.util.Optional; + +/** + * Class to extract group rebalance related configs. + */ +public class GroupRebalanceConfig { + + public enum ProtocolType { + CONSUMER, + CONNECT; + + @Override + public String toString() { + return super.toString().toLowerCase(Locale.ROOT); + } + } + + public final int sessionTimeoutMs; + public final int rebalanceTimeoutMs; + public final int heartbeatIntervalMs; + public final String groupId; + public final Optional<String> groupInstanceId; + public final long retryBackoffMs; + public final boolean leaveGroupOnClose; + + public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) { + this.sessionTimeoutMs = config.getInt(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG); + + // Consumer and Connect use different config names for defining rebalance timeout + if (protocolType == ProtocolType.CONSUMER) { + this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + } else { + this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG); + } + + this.heartbeatIntervalMs = config.getInt(CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG); + this.groupId = config.getString(CommonClientConfigs.GROUP_ID_CONFIG); + + // Static membership is only introduced in consumer API. + if (protocolType == ProtocolType.CONSUMER) { + String groupInstanceId = config.getString(CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG); + if (groupInstanceId != null) { + JoinGroupRequest.validateGroupInstanceId(groupInstanceId); + this.groupInstanceId = Optional.of(groupInstanceId); + } else { + this.groupInstanceId = Optional.empty(); + } + } else { + this.groupInstanceId = Optional.empty(); + } + + this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); + + // Internal leave group config is only defined in Consumer. + if (protocolType == ProtocolType.CONSUMER) { + this.leaveGroupOnClose = config.getBoolean("internal.leave.group.on.close"); + } else { + this.leaveGroupOnClose = true; + } + } + + // For testing purpose. + public GroupRebalanceConfig(final int sessionTimeoutMs, + final int rebalanceTimeoutMs, + final int heartbeatIntervalMs, + String groupId, + Optional<String> groupInstanceId, + long retryBackoffMs, + boolean leaveGroupOnClose) { + this.sessionTimeoutMs = sessionTimeoutMs; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.heartbeatIntervalMs = heartbeatIntervalMs; + this.groupId = groupId; + this.groupInstanceId = groupInstanceId; + this.retryBackoffMs = retryBackoffMs; + this.leaveGroupOnClose = leaveGroupOnClose; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 010fff8..7eb34d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -50,50 +50,33 @@ public class ConsumerConfig extends AbstractConfig { /** * <code>group.id</code> */ - public static final String GROUP_ID_CONFIG = "group.id"; - private static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy."; + public static final String GROUP_ID_CONFIG = CommonClientConfigs.GROUP_ID_CONFIG; + private static final String GROUP_ID_DOC = CommonClientConfigs.GROUP_ID_DOC; /** * <code>group.instance.id</code> */ - public static final String GROUP_INSTANCE_ID_CONFIG = "group.instance.id"; - private static final String GROUP_INSTANCE_ID_DOC = "A unique identifier of the consumer instance provided by end user. " + - "Only non-empty strings are permitted. If set, the consumer is treated as a static member, " + - "which means that only one instance with this ID is allowed in the consumer group at any time. " + - "This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability " + - "(e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior."; + public static final String GROUP_INSTANCE_ID_CONFIG = CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG; + private static final String GROUP_INSTANCE_ID_DOC = CommonClientConfigs.GROUP_INSTANCE_ID_DOC; /** <code>max.poll.records</code> */ public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records"; private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll()."; /** <code>max.poll.interval.ms</code> */ - public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms"; - private static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using " + - "consumer group management. This places an upper bound on the amount of time that the consumer can be idle " + - "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer " + - "is considered failed and the group will rebalance in order to reassign the partitions to another member. "; - + public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG; + private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC; /** * <code>session.timeout.ms</code> */ - public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms"; - private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect consumer failures when using " + - "Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness " + - "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, " + - "then the broker will remove this consumer from the group and initiate a rebalance. Note that the value " + - "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> " + - "and <code>group.max.session.timeout.ms</code>."; + public static final String SESSION_TIMEOUT_MS_CONFIG = CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG; + private static final String SESSION_TIMEOUT_MS_DOC = CommonClientConfigs.SESSION_TIMEOUT_MS_DOC; /** * <code>heartbeat.interval.ms</code> */ - public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms"; - private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer " + - "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " + - "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. " + - "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher " + - "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances."; + public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; + private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; /** * <code>bootstrap.servers</code> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 5be065d..79afafa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; @@ -27,7 +28,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry; -import org.apache.kafka.clients.consumer.internals.Heartbeat; import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.consumer.internals.SubscriptionState; @@ -50,7 +50,6 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.requests.IsolationLevel; -import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.AppInfoParser; @@ -568,7 +567,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { private final Logger log; private final String clientId; private String groupId; - private Optional<String> groupInstanceId; private final ConsumerCoordinator coordinator; private final Deserializer<K> keyDeserializer; private final Deserializer<V> valueDeserializer; @@ -674,15 +672,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { this.clientId = clientId; this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); + GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, + GroupRebalanceConfig.ProtocolType.CONSUMER); LogContext logContext; // If group.instance.id is set, we will append it to the log context. - String groupInstanceId = config.getString(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG); - if (groupInstanceId != null) { - JoinGroupRequest.validateGroupInstanceId(groupInstanceId); - this.groupInstanceId = Optional.of(groupInstanceId); - logContext = new LogContext("[Consumer instanceId=" + groupInstanceId + ", clientId=" + clientId + ", groupId=" + groupId + "] "); + if (groupRebalanceConfig.groupInstanceId.isPresent()) { + logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() + + ", clientId=" + clientId + ", groupId=" + groupId + "] "); } else { - this.groupInstanceId = Optional.empty(); logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] "); } @@ -773,28 +770,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, PartitionAssignor.class); - int maxPollIntervalMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); - int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); // no coordinator will be constructed for the default (null) group id this.coordinator = groupId == null ? null : - new ConsumerCoordinator(logContext, + new ConsumerCoordinator(groupRebalanceConfig, + logContext, this.client, - groupId, - this.groupInstanceId, - maxPollIntervalMs, - sessionTimeoutMs, - new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs), assignors, this.metadata, this.subscriptions, metrics, metricGrpPrefix, this.time, - retryBackoffMs, enableAutoCommit, config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), - this.interceptors, - config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); + this.interceptors); this.fetcher = new Fetcher<>( logContext, this.client, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 30277b3..efe6cb7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; @@ -71,7 +72,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -114,15 +114,11 @@ public abstract class AbstractCoordinator implements Closeable { } private final Logger log; - private final int sessionTimeoutMs; private final GroupCoordinatorMetrics sensors; private final Heartbeat heartbeat; - protected final int rebalanceTimeoutMs; - protected final String groupId; - protected final Optional<String> groupInstanceId; + private final GroupRebalanceConfig rebalanceConfig; protected final ConsumerNetworkClient client; protected final Time time; - protected final long retryBackoffMs; private HeartbeatThread heartbeatThread = null; private boolean rejoinNeeded = true; @@ -133,52 +129,24 @@ public abstract class AbstractCoordinator implements Closeable { private Generation generation = Generation.NO_GENERATION; private RequestFuture<Void> findCoordinatorFuture = null; - private final boolean leaveGroupOnClose; /** * Initialize the coordination manager. */ - public AbstractCoordinator(LogContext logContext, + public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, + LogContext logContext, ConsumerNetworkClient client, - String groupId, - Optional<String> groupInstanceId, - int rebalanceTimeoutMs, - int sessionTimeoutMs, - Heartbeat heartbeat, Metrics metrics, String metricGrpPrefix, - Time time, - long retryBackoffMs, - boolean leaveGroupOnClose) { + Time time) { + Objects.requireNonNull(rebalanceConfig.groupId, + "Expected a non-null group id for coordinator construction"); + this.rebalanceConfig = rebalanceConfig; this.log = logContext.logger(AbstractCoordinator.class); this.client = client; this.time = time; - this.groupId = Objects.requireNonNull(groupId, - "Expected a non-null group id for coordinator construction"); - this.groupInstanceId = groupInstanceId; - this.rebalanceTimeoutMs = rebalanceTimeoutMs; - this.sessionTimeoutMs = sessionTimeoutMs; - this.heartbeat = heartbeat; + this.heartbeat = new Heartbeat(rebalanceConfig, time); this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix); - this.retryBackoffMs = retryBackoffMs; - this.leaveGroupOnClose = leaveGroupOnClose; - } - - public AbstractCoordinator(LogContext logContext, - ConsumerNetworkClient client, - String groupId, - Optional<String> groupInstanceId, - int rebalanceTimeoutMs, - int sessionTimeoutMs, - int heartbeatIntervalMs, - Metrics metrics, - String metricGrpPrefix, - Time time, - long retryBackoffMs, - boolean leaveGroupOnClose) { - this(logContext, client, groupId, groupInstanceId, rebalanceTimeoutMs, sessionTimeoutMs, - new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs), - metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose); } /** @@ -263,7 +231,7 @@ public abstract class AbstractCoordinator implements Closeable { // we found the coordinator, but the connection has failed, so mark // it dead and backoff before retrying discovery markCoordinatorUnknown(); - timer.sleep(retryBackoffMs); + timer.sleep(rebalanceConfig.retryBackoffMs); } } while (coordinatorUnknown() && timer.notExpired()); @@ -438,7 +406,7 @@ public abstract class AbstractCoordinator implements Closeable { else if (!future.isRetriable()) throw exception; - timer.sleep(retryBackoffMs); + timer.sleep(rebalanceConfig.retryBackoffMs); } } return true; @@ -505,13 +473,13 @@ public abstract class AbstractCoordinator implements Closeable { log.info("(Re-)joining group"); JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder( new JoinGroupRequestData() - .setGroupId(groupId) - .setSessionTimeoutMs(this.sessionTimeoutMs) + .setGroupId(rebalanceConfig.groupId) + .setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs) .setMemberId(this.generation.memberId) - .setGroupInstanceId(this.groupInstanceId.orElse(null)) + .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)) .setProtocolType(protocolType()) .setProtocols(metadata()) - .setRebalanceTimeoutMs(this.rebalanceTimeoutMs) + .setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs) ); log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator); @@ -519,7 +487,7 @@ public abstract class AbstractCoordinator implements Closeable { // Note that we override the request timeout using the rebalance timeout since that is the // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays. - int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000); + int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000); return client.send(coordinator, requestBuilder, joinGroupTimeoutMs) .compose(new JoinGroupResponseHandler()); } @@ -573,9 +541,9 @@ public abstract class AbstractCoordinator implements Closeable { // log the error and re-throw the exception log.error("Attempt to join group failed due to fatal error: {}", error.message()); if (error == Errors.GROUP_MAX_SIZE_REACHED) { - future.raise(new GroupMaxSizeReachedException(groupId)); + future.raise(new GroupMaxSizeReachedException(rebalanceConfig.groupId)); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - future.raise(new GroupAuthorizationException(groupId)); + future.raise(new GroupAuthorizationException(rebalanceConfig.groupId)); } else { future.raise(error); } @@ -606,9 +574,9 @@ public abstract class AbstractCoordinator implements Closeable { SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder( new SyncGroupRequestData() - .setGroupId(groupId) + .setGroupId(rebalanceConfig.groupId) .setMemberId(generation.memberId) - .setGroupInstanceId(this.groupInstanceId.orElse(null)) + .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)) .setGenerationId(generation.generationId) .setAssignments(Collections.emptyList()) ); @@ -633,9 +601,9 @@ public abstract class AbstractCoordinator implements Closeable { SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder( new SyncGroupRequestData() - .setGroupId(groupId) + .setGroupId(rebalanceConfig.groupId) .setMemberId(generation.memberId) - .setGroupInstanceId(this.groupInstanceId.orElse(null)) + .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)) .setGenerationId(generation.generationId) .setAssignments(groupAssignmentList) ); @@ -665,7 +633,7 @@ public abstract class AbstractCoordinator implements Closeable { requestRejoin(); if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - future.raise(new GroupAuthorizationException(groupId)); + future.raise(new GroupAuthorizationException(rebalanceConfig.groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.debug("SyncGroup failed because the group began another rebalance"); future.raise(error); @@ -701,7 +669,7 @@ public abstract class AbstractCoordinator implements Closeable { new FindCoordinatorRequest.Builder( new FindCoordinatorRequestData() .setKeyType(CoordinatorType.GROUP.id()) - .setKey(this.groupId)); + .setKey(this.rebalanceConfig.groupId)); return client.send(node, requestBuilder) .compose(new FindCoordinatorResponseHandler()); } @@ -731,7 +699,7 @@ public abstract class AbstractCoordinator implements Closeable { } future.complete(null); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - future.raise(new GroupAuthorizationException(groupId)); + future.raise(new GroupAuthorizationException(rebalanceConfig.groupId)); } else { log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage()); future.raise(error); @@ -854,7 +822,7 @@ public abstract class AbstractCoordinator implements Closeable { // Synchronize after closing the heartbeat thread since heartbeat thread // needs this lock to complete and terminate after close flag is set. synchronized (this) { - if (leaveGroupOnClose) { + if (rebalanceConfig.leaveGroupOnClose) { maybeLeaveGroup(); } @@ -883,7 +851,7 @@ public abstract class AbstractCoordinator implements Closeable { // attempt any resending if the request fails or times out. log.info("Member {} sending LeaveGroup request to coordinator {}", generation.memberId, coordinator); LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(new LeaveGroupRequestData() - .setGroupId(groupId).setMemberId(generation.memberId)); + .setGroupId(rebalanceConfig.groupId).setMemberId(generation.memberId)); client.send(coordinator, request) .compose(new LeaveGroupResponseHandler()); client.pollNoWakeup(); @@ -893,7 +861,7 @@ public abstract class AbstractCoordinator implements Closeable { } protected boolean isDynamicMember() { - return !groupInstanceId.isPresent(); + return !rebalanceConfig.groupInstanceId.isPresent(); } private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> { @@ -915,9 +883,9 @@ public abstract class AbstractCoordinator implements Closeable { log.debug("Sending Heartbeat request to coordinator {}", coordinator); HeartbeatRequest.Builder requestBuilder = new HeartbeatRequest.Builder(new HeartbeatRequestData() - .setGroupId(groupId) + .setGroupId(rebalanceConfig.groupId) .setMemberId(this.generation.memberId) - .setGroupInstanceId(this.groupInstanceId.orElse(null)) + .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)) .setGenerationId(this.generation.generationId)); return client.send(coordinator, requestBuilder) .compose(new HeartbeatResponseHandler()); @@ -953,7 +921,7 @@ public abstract class AbstractCoordinator implements Closeable { resetGeneration(); future.raise(error); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - future.raise(new GroupAuthorizationException(groupId)); + future.raise(new GroupAuthorizationException(rebalanceConfig.groupId)); } else { future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message())); } @@ -1051,7 +1019,7 @@ public abstract class AbstractCoordinator implements Closeable { private AtomicReference<RuntimeException> failed = new AtomicReference<>(null); private HeartbeatThread() { - super(HEARTBEAT_THREAD_PREFIX + (groupId.isEmpty() ? "" : " | " + groupId), true); + super(HEARTBEAT_THREAD_PREFIX + (rebalanceConfig.groupId.isEmpty() ? "" : " | " + rebalanceConfig.groupId), true); } public void enable() { @@ -1113,7 +1081,7 @@ public abstract class AbstractCoordinator implements Closeable { if (findCoordinatorFuture != null || lookupCoordinator().failed()) // the immediate future check ensures that we backoff properly in the case that no // brokers are available to connect to. - AbstractCoordinator.this.wait(retryBackoffMs); + AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) { // the session timeout has expired without seeing a successful heartbeat, so we should // probably make sure the coordinator is still healthy. @@ -1131,7 +1099,7 @@ public abstract class AbstractCoordinator implements Closeable { } else if (!heartbeat.shouldHeartbeat(now)) { // poll again after waiting for the retry backoff in case the heartbeat failed or the // coordinator disconnected - AbstractCoordinator.this.wait(retryBackoffMs); + AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs); } else { heartbeat.sentHeartbeat(now); @@ -1153,7 +1121,7 @@ public abstract class AbstractCoordinator implements Closeable { // however, then the session timeout may expire before we can rejoin. heartbeat.receiveHeartbeat(); } else if (e instanceof FencedInstanceIdException) { - log.error("Caught fenced group.instance.id {} error in heartbeat thread", groupInstanceId); + log.error("Caught fenced group.instance.id {} error in heartbeat thread", rebalanceConfig.groupInstanceId); heartbeatThread.failed.set(e); heartbeatThread.disable(); } else { @@ -1243,4 +1211,8 @@ public abstract class AbstractCoordinator implements Closeable { } + // For testing only + public Heartbeat heartbeat() { + return heartbeat; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index fbf5db4..a590a1e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -63,7 +64,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -74,6 +74,7 @@ import java.util.stream.Collectors; * This class manages the coordination process with the consumer coordinator. */ public final class ConsumerCoordinator extends AbstractCoordinator { + private final GroupRebalanceConfig rebalanceConfig; private final Logger log; private final List<PartitionAssignor> assignors; private final ConsumerMetadata metadata; @@ -120,36 +121,25 @@ public final class ConsumerCoordinator extends AbstractCoordinator { /** * Initialize the coordination manager. */ - public ConsumerCoordinator(LogContext logContext, + public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, + LogContext logContext, ConsumerNetworkClient client, - String groupId, - Optional<String> groupInstanceId, - int rebalanceTimeoutMs, - int sessionTimeoutMs, - Heartbeat heartbeat, List<PartitionAssignor> assignors, ConsumerMetadata metadata, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, Time time, - long retryBackoffMs, boolean autoCommitEnabled, int autoCommitIntervalMs, - ConsumerInterceptors<?, ?> interceptors, - boolean leaveGroupOnClose) { - super(logContext, + ConsumerInterceptors<?, ?> interceptors) { + super(rebalanceConfig, + logContext, client, - groupId, - groupInstanceId, - rebalanceTimeoutMs, - sessionTimeoutMs, - heartbeat, metrics, metricGrpPrefix, - time, - retryBackoffMs, - leaveGroupOnClose); + time); + this.rebalanceConfig = rebalanceConfig; this.log = logContext.logger(ConsumerCoordinator.class); this.metadata = metadata; this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion()); @@ -459,7 +449,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @Override protected void onJoinPrepare(int generation, String memberId) { // commit offsets prior to rebalance if auto-commit enabled - maybeAutoCommitOffsetsSync(time.timer(rebalanceTimeoutMs)); + maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)); // execute the user's callback before rebalance ConsumerRebalanceListener listener = subscriptions.rebalanceListener(); @@ -558,7 +548,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } else if (!future.isRetriable()) { throw future.exception(); } else { - timer.sleep(retryBackoffMs); + timer.sleep(rebalanceConfig.retryBackoffMs); } } else { return null; @@ -585,8 +575,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // visible for testing void invokeCompletedOffsetCommitCallbacks() { if (asyncCommitFenced.get()) { - throw new FencedInstanceIdException("Get fenced exception for group.instance.id: " + - groupInstanceId.orElse("unset_instance_id") + ", current member.id is " + memberId()); + throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + + rebalanceConfig.groupInstanceId.orElse("unset_instance_id") + + ", current member.id is " + memberId()); } while (true) { OffsetCommitCompletion completion = completedOffsetCommits.poll(); @@ -698,7 +689,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (future.failed() && !future.isRetriable()) throw future.exception(); - timer.sleep(retryBackoffMs); + timer.sleep(rebalanceConfig.retryBackoffMs); } while (timer.notExpired()); return false; @@ -723,7 +714,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (exception instanceof RetriableException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets, exception); - nextAutoCommitTimer.updateAndReset(retryBackoffMs); + nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs); } else { log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage()); } @@ -813,10 +804,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder( new OffsetCommitRequestData() - .setGroupId(this.groupId) + .setGroupId(this.rebalanceConfig.groupId) .setGenerationId(generation.generationId) .setMemberId(generation.memberId) - .setGroupInstanceId(groupInstanceId.orElse(null)) + .setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null)) .setTopics(new ArrayList<>(requestTopicDataMap.values())) ); @@ -857,7 +848,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - future.raise(new GroupAuthorizationException(groupId)); + future.raise(new GroupAuthorizationException(rebalanceConfig.groupId)); return; } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { unauthorizedTopics.add(tp.topic()); @@ -929,7 +920,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { log.debug("Fetching committed offsets for partitions: {}", partitions); // construct the request - OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(this.groupId, + OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(this.rebalanceConfig.groupId, new ArrayList<>(partitions)); // send the request with a callback @@ -952,7 +943,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { markCoordinatorUnknown(); future.raise(error); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - future.raise(new GroupAuthorizationException(groupId)); + future.raise(new GroupAuthorizationException(rebalanceConfig.groupId)); } else { future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java index 8a67f31..3bf8c92 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -23,10 +24,8 @@ import org.apache.kafka.common.utils.Timer; * A helper class for managing the heartbeat to the coordinator */ public final class Heartbeat { - private final int sessionTimeoutMs; - private final int heartbeatIntervalMs; private final int maxPollIntervalMs; - private final long retryBackoffMs; + private final GroupRebalanceConfig rebalanceConfig; private final Time time; private final Timer heartbeatTimer; private final Timer sessionTimer; @@ -34,21 +33,15 @@ public final class Heartbeat { private volatile long lastHeartbeatSend; - public Heartbeat(Time time, - int sessionTimeoutMs, - int heartbeatIntervalMs, - int maxPollIntervalMs, - long retryBackoffMs) { - if (heartbeatIntervalMs >= sessionTimeoutMs) + public Heartbeat(GroupRebalanceConfig config, + Time time) { + if (config.heartbeatIntervalMs >= config.sessionTimeoutMs) throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout"); - + this.rebalanceConfig = config; this.time = time; - this.sessionTimeoutMs = sessionTimeoutMs; - this.heartbeatIntervalMs = heartbeatIntervalMs; - this.maxPollIntervalMs = maxPollIntervalMs; - this.retryBackoffMs = retryBackoffMs; - this.heartbeatTimer = time.timer(heartbeatIntervalMs); - this.sessionTimer = time.timer(sessionTimeoutMs); + this.heartbeatTimer = time.timer(config.heartbeatIntervalMs); + this.sessionTimer = time.timer(config.sessionTimeoutMs); + this.maxPollIntervalMs = config.rebalanceTimeoutMs; this.pollTimer = time.timer(maxPollIntervalMs); } @@ -66,17 +59,17 @@ public final class Heartbeat { public void sentHeartbeat(long now) { this.lastHeartbeatSend = now; update(now); - heartbeatTimer.reset(heartbeatIntervalMs); + heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs); } public void failHeartbeat() { update(time.milliseconds()); - heartbeatTimer.reset(retryBackoffMs); + heartbeatTimer.reset(rebalanceConfig.retryBackoffMs); } public void receiveHeartbeat() { update(time.milliseconds()); - sessionTimer.reset(sessionTimeoutMs); + sessionTimer.reset(rebalanceConfig.sessionTimeoutMs); } public boolean shouldHeartbeat(long now) { @@ -100,14 +93,14 @@ public final class Heartbeat { public void resetTimeouts() { update(time.milliseconds()); - sessionTimer.reset(sessionTimeoutMs); + sessionTimer.reset(rebalanceConfig.sessionTimeoutMs); pollTimer.reset(maxPollIntervalMs); - heartbeatTimer.reset(heartbeatIntervalMs); + heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs); } public void resetSessionTimeout() { update(time.milliseconds()); - sessionTimer.reset(sessionTimeoutMs); + sessionTimer.reset(rebalanceConfig.sessionTimeoutMs); } public boolean pollTimeoutExpired(long now) { diff --git a/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java b/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java index 63a9312..d65e6d1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java @@ -58,7 +58,7 @@ public class CommonClientConfigsTest { } @Test - public void testExponentialBackoffDefaults() throws Exception { + public void testExponentialBackoffDefaults() { TestConfig defaultConf = new TestConfig(Collections.emptyMap()); assertEquals(Long.valueOf(50L), defaultConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 405ec68..c1adf19 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; @@ -27,7 +28,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.clients.consumer.internals.Fetcher; -import org.apache.kafka.clients.consumer.internals.Heartbeat; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; @@ -1895,27 +1895,25 @@ public class KafkaConsumerTest { ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time, retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs); - Heartbeat heartbeat = new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs); - ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator( - loggerFactory, - consumerClient, - groupId, - groupInstanceId, - rebalanceTimeoutMs, - sessionTimeoutMs, - heartbeat, - assignors, - metadata, - subscription, - metrics, - metricGroupPrefix, - time, - retryBackoffMs, - autoCommitEnabled, - autoCommitIntervalMs, - interceptors, - true); - + GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, + rebalanceTimeoutMs, + heartbeatIntervalMs, + groupId, + groupInstanceId, + retryBackoffMs, + true); + ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(rebalanceConfig, + loggerFactory, + consumerClient, + assignors, + metadata, + subscription, + metrics, + metricGroupPrefix, + time, + autoCommitEnabled, + autoCommitIntervalMs, + interceptors); Fetcher<String, String> fetcher = new Fetcher<>( loggerFactory, consumerClient, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 0fc5f62..659ef5f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Node; @@ -104,14 +105,30 @@ public class AbstractCoordinatorTest { logContext, new ClusterResourceListeners()); this.mockClient = new MockClient(mockTime, metadata); - this.consumerClient = new ConsumerNetworkClient(logContext, mockClient, metadata, mockTime, - retryBackoffMs, REQUEST_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS); + this.consumerClient = new ConsumerNetworkClient(logContext, + mockClient, + metadata, + mockTime, + retryBackoffMs, + REQUEST_TIMEOUT_MS, + HEARTBEAT_INTERVAL_MS); Metrics metrics = new Metrics(); mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, emptyMap())); this.node = metadata.fetch().nodes().get(0); this.coordinatorNode = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); - this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime, rebalanceTimeoutMs, retryBackoffMs, groupInstanceId); + + GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(SESSION_TIMEOUT_MS, + rebalanceTimeoutMs, + HEARTBEAT_INTERVAL_MS, + GROUP_ID, + groupInstanceId, + retryBackoffMs, + !groupInstanceId.isPresent()); + this.coordinator = new DummyCoordinator(rebalanceConfig, + consumerClient, + metrics, + mockTime); } @Test @@ -850,14 +867,11 @@ public class AbstractCoordinatorTest { private int onJoinCompleteInvokes = 0; private boolean wakeupOnJoinComplete = false; - public DummyCoordinator(ConsumerNetworkClient client, + public DummyCoordinator(GroupRebalanceConfig rebalanceConfig, + ConsumerNetworkClient client, Metrics metrics, - Time time, - int rebalanceTimeoutMs, - int retryBackoffMs, - Optional<String> groupInstanceId) { - super(new LogContext(), client, GROUP_ID, groupInstanceId, rebalanceTimeoutMs, - SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, !groupInstanceId.isPresent()); + Time time) { + super(rebalanceConfig, new LogContext(), client, metrics, METRIC_GROUP_PREFIX, time); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 4b377ea..c54540e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -118,8 +119,7 @@ public class ConsumerCoordinatorTest { private final int autoCommitIntervalMs = 2000; private final int requestTimeoutMs = 30000; private final MockTime time = new MockTime(); - private final Heartbeat heartbeat = new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, - rebalanceTimeoutMs, retryBackoffMs); + private GroupRebalanceConfig rebalanceConfig; private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor(); private List<PartitionAssignor> assignors = Collections.singletonList(partitionAssignor); @@ -153,8 +153,21 @@ public class ConsumerCoordinatorTest { this.rebalanceListener = new MockRebalanceListener(); this.mockOffsetCommitCallback = new MockCommitCallback(); this.partitionAssignor.clear(); + this.rebalanceConfig = buildRebalanceConfig(Optional.empty()); + this.coordinator = buildCoordinator(rebalanceConfig, + metrics, + assignors, + false); + } - this.coordinator = buildCoordinator(metrics, assignors, false, Optional.empty()); + private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstanceId) { + return new GroupRebalanceConfig(sessionTimeoutMs, + rebalanceTimeoutMs, + heartbeatIntervalMs, + groupId, + groupInstanceId, + retryBackoffMs, + !groupInstanceId.isPresent()); } @After @@ -741,10 +754,10 @@ public class ConsumerCoordinatorTest { assertTrue(coordinator.coordinatorUnknown()); assertFalse(coordinator.poll(time.timer(0))); - assertEquals(time.milliseconds(), heartbeat.lastPollTime()); + assertEquals(time.milliseconds(), coordinator.heartbeat().lastPollTime()); time.sleep(rebalanceTimeoutMs - 1); - assertFalse(heartbeat.pollTimeoutExpired(time.milliseconds())); + assertFalse(coordinator.heartbeat().pollTimeoutExpired(time.milliseconds())); } @Test @@ -1044,9 +1057,6 @@ public class ConsumerCoordinatorTest { @Test public void testWakeupFromAssignmentCallback() { - ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - false, Optional.empty()); - final String topic = "topic1"; TopicPartition partition = new TopicPartition(topic, 0); final String consumerId = "follower"; @@ -1162,7 +1172,10 @@ public class ConsumerCoordinatorTest { metadata = new ConsumerMetadata(0, Long.MAX_VALUE, includeInternalTopics, false, subscriptions, new LogContext(), new ClusterResourceListeners()); client = new MockClient(time, metadata); - coordinator = buildCoordinator(new Metrics(), assignors, false, Optional.empty()); + coordinator = buildCoordinator(rebalanceConfig, + new Metrics(), + assignors, + false); subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener); @@ -1290,8 +1303,10 @@ public class ConsumerCoordinatorTest { public void testAutoCommitDynamicAssignment() { final String consumerId = "consumer"; - ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - true, groupInstanceId); + ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, + new Metrics(), + assignors, + true); subscriptions.subscribe(singleton(topic1), rebalanceListener); joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p)); @@ -1306,8 +1321,10 @@ public class ConsumerCoordinatorTest { @Test public void testAutoCommitRetryBackoff() { final String consumerId = "consumer"; - ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - true, groupInstanceId); + ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, + new Metrics(), + assignors, + true); subscriptions.subscribe(singleton(topic1), rebalanceListener); joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p)); @@ -1340,7 +1357,10 @@ public class ConsumerCoordinatorTest { @Test public void testAutoCommitAwaitsInterval() { final String consumerId = "consumer"; - ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, true, groupInstanceId); + ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, + new Metrics(), + assignors, + true); subscriptions.subscribe(singleton(topic1), rebalanceListener); joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p)); @@ -1378,8 +1398,10 @@ public class ConsumerCoordinatorTest { public void testAutoCommitDynamicAssignmentRebalance() { final String consumerId = "consumer"; - ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - true, groupInstanceId); + ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, + new Metrics(), + assignors, + true); subscriptions.subscribe(singleton(topic1), rebalanceListener); @@ -1404,8 +1426,10 @@ public class ConsumerCoordinatorTest { @Test public void testAutoCommitManualAssignment() { - ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - true, groupInstanceId); + ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, + new Metrics(), + assignors, + true); subscriptions.assignFromUser(singleton(t1p)); subscriptions.seek(t1p, 100); @@ -1421,8 +1445,10 @@ public class ConsumerCoordinatorTest { @Test public void testAutoCommitManualAssignmentCoordinatorUnknown() { - ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - true, groupInstanceId); + ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, + new Metrics(), + assignors, + true); subscriptions.assignFromUser(singleton(t1p)); subscriptions.seek(t1p, 100); @@ -2066,8 +2092,10 @@ public class ConsumerCoordinatorTest { @Test public void testAutoCommitAfterCoordinatorBackToService() { - ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - true, groupInstanceId); + ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, + new Metrics(), + assignors, + true); subscriptions.assignFromUser(Collections.singleton(t1p)); subscriptions.seek(t1p, 100L); @@ -2125,8 +2153,11 @@ public class ConsumerCoordinatorTest { final boolean autoCommit, final Optional<String> groupInstanceId) { final String consumerId = "consumer"; - ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - autoCommit, groupInstanceId); + rebalanceConfig = buildRebalanceConfig(groupInstanceId); + ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, + new Metrics(), + assignors, + autoCommit); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); if (useGroupManagement) { @@ -2216,30 +2247,23 @@ public class ConsumerCoordinatorTest { assertEquals("leaveGroupRequested should be " + shouldLeaveGroup, shouldLeaveGroup, leaveGroupRequested.get()); } - private ConsumerCoordinator buildCoordinator(final Metrics metrics, + private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig, + final Metrics metrics, final List<PartitionAssignor> assignors, - final boolean autoCommitEnabled, - final Optional<String> groupInstanceId) { + final boolean autoCommitEnabled) { return new ConsumerCoordinator( + rebalanceConfig, new LogContext(), consumerClient, - groupId, - groupInstanceId, - rebalanceTimeoutMs, - sessionTimeoutMs, - heartbeat, assignors, metadata, subscriptions, metrics, "consumer" + groupId, time, - retryBackoffMs, autoCommitEnabled, autoCommitIntervalMs, - null, - !groupInstanceId.isPresent() - ); + null); } private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java index c382de6..b014bec 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java @@ -16,23 +16,38 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.common.utils.MockTime; +import org.junit.Before; import org.junit.Test; +import java.util.Optional; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class HeartbeatTest { - private int sessionTimeoutMs = 300; private int heartbeatIntervalMs = 100; private int maxPollIntervalMs = 900; private long retryBackoffMs = 10L; private MockTime time = new MockTime(); - private Heartbeat heartbeat = new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, - maxPollIntervalMs, retryBackoffMs); + + private Heartbeat heartbeat; + + @Before + public void setUp() { + GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, + maxPollIntervalMs, + heartbeatIntervalMs, + "group_id", + Optional.empty(), + retryBackoffMs, + true); + heartbeat = new Heartbeat(rebalanceConfig, time); + } @Test public void testShouldHeartbeat() { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java index c7ff6d8..bc3bc19 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java @@ -39,13 +39,13 @@ public class DistributedConfig extends WorkerConfig { /** * <code>group.id</code> */ - public static final String GROUP_ID_CONFIG = "group.id"; + public static final String GROUP_ID_CONFIG = CommonClientConfigs.GROUP_ID_CONFIG; private static final String GROUP_ID_DOC = "A unique string that identifies the Connect cluster group this worker belongs to."; /** * <code>session.timeout.ms</code> */ - public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms"; + public static final String SESSION_TIMEOUT_MS_CONFIG = CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG; private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect worker failures. " + "The worker sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are " + "received by the broker before the expiration of this session timeout, then the broker will remove the " + @@ -56,7 +56,7 @@ public class DistributedConfig extends WorkerConfig { /** * <code>heartbeat.interval.ms</code> */ - public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms"; + public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the group " + "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " + "worker's session stays active and to facilitate rebalancing when new members join or leave the group. " + @@ -66,11 +66,8 @@ public class DistributedConfig extends WorkerConfig { /** * <code>rebalance.timeout.ms</code> */ - public static final String REBALANCE_TIMEOUT_MS_CONFIG = "rebalance.timeout.ms"; - private static final String REBALANCE_TIMEOUT_MS_DOC = "The maximum allowed time for each worker to join the group " + - "once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to " + - "flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed " + - "from the group, which will cause offset commit failures."; + public static final String REBALANCE_TIMEOUT_MS_CONFIG = CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG; + private static final String REBALANCE_TIMEOUT_MS_DOC = CommonClientConfigs.REBALANCE_TIMEOUT_MS_DOC; /** * <code>worker.sync.timeout.ms</code> diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 230a272..0b855a0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.distributed; import org.apache.kafka.clients.consumer.internals.AbstractCoordinator; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; +import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -39,7 +40,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; @@ -70,33 +70,23 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable /** * Initialize the coordination manager. */ - public WorkerCoordinator(LogContext logContext, + public WorkerCoordinator(GroupRebalanceConfig config, + LogContext logContext, ConsumerNetworkClient client, - String groupId, - int rebalanceTimeoutMs, - int sessionTimeoutMs, - int heartbeatIntervalMs, Metrics metrics, String metricGrpPrefix, Time time, - long retryBackoffMs, String restUrl, ConfigBackingStore configStorage, WorkerRebalanceListener listener, ConnectProtocolCompatibility protocolCompatibility, int maxDelay) { - super(logContext, + super(config, + logContext, client, - groupId, - Optional.empty(), - rebalanceTimeoutMs, - sessionTimeoutMs, - heartbeatIntervalMs, metrics, metricGrpPrefix, - time, - retryBackoffMs, - true); + time); this.log = logContext.logger(WorkerCoordinator.class); this.restUrl = restUrl; this.configStorage = configStorage; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 99ea3a4..94cf97d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; +import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.JmxReporter; @@ -79,8 +80,6 @@ public class WorkerGroupMember { this.clientId = clientId; this.log = logContext.logger(WorkerGroupMember.class); - String groupId = config.getString(DistributedConfig.GROUP_ID_CONFIG); - Map<String, String> metricsTags = new LinkedHashMap<>(); metricsTags.put("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG)) @@ -124,16 +123,12 @@ public class WorkerGroupMember { config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), Integer.MAX_VALUE); this.coordinator = new WorkerCoordinator( + new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.CONNECT), logContext, this.client, - groupId, - config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG), - config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG), - config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG), metrics, metricGrpPrefix, this.time, - retryBackoffMs, restUrl, configStorage, listener, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java index f06976a..b1d3ec6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime.distributed; +import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; @@ -45,6 +46,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol; import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; @@ -93,6 +95,7 @@ public class WorkerCoordinatorIncrementalTest { private MockRebalanceListener rebalanceListener; @Mock private KafkaConfigBackingStore configStorage; + private GroupRebalanceConfig rebalanceConfig; private WorkerCoordinator coordinator; private int rebalanceDelay = DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT; @@ -150,22 +153,24 @@ public class WorkerCoordinatorIncrementalTest { this.configStorageCalls = 0; - this.coordinator = new WorkerCoordinator( - loggerFactory, - consumerClient, - groupId, - rebalanceTimeoutMs, - sessionTimeoutMs, - heartbeatIntervalMs, - metrics, - "worker" + groupId, - time, - retryBackoffMs, - expectedUrl(leaderId), - configStorage, - rebalanceListener, - compatibility, - rebalanceDelay); + this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, + rebalanceTimeoutMs, + heartbeatIntervalMs, + groupId, + Optional.empty(), + retryBackoffMs, + true); + this.coordinator = new WorkerCoordinator(rebalanceConfig, + loggerFactory, + consumerClient, + metrics, + "worker" + groupId, + time, + expectedUrl(leaderId), + configStorage, + rebalanceListener, + compatibility, + rebalanceDelay); configState1 = clusterConfigState(offset, 2, 4); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index 182d6bd..eac13d1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime.distributed; +import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; @@ -56,6 +57,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -90,6 +92,7 @@ public class WorkerCoordinatorTest { private ConsumerNetworkClient consumerClient; private MockRebalanceListener rebalanceListener; @Mock private KafkaConfigBackingStore configStorage; + private GroupRebalanceConfig rebalanceConfig; private WorkerCoordinator coordinator; private ClusterConfigState configState1; @@ -125,23 +128,24 @@ public class WorkerCoordinatorTest { this.metrics = new Metrics(time); this.rebalanceListener = new MockRebalanceListener(); this.configStorage = PowerMock.createMock(KafkaConfigBackingStore.class); - - this.coordinator = new WorkerCoordinator( - logContext, - consumerClient, - groupId, - rebalanceTimeoutMs, - sessionTimeoutMs, - heartbeatIntervalMs, - metrics, - "consumer" + groupId, - time, - retryBackoffMs, - LEADER_URL, - configStorage, - rebalanceListener, - compatibility, - 0); + this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, + rebalanceTimeoutMs, + heartbeatIntervalMs, + groupId, + Optional.empty(), + retryBackoffMs, + true); + this.coordinator = new WorkerCoordinator(rebalanceConfig, + logContext, + consumerClient, + metrics, + "consumer" + groupId, + time, + LEADER_URL, + configStorage, + rebalanceListener, + compatibility, + 0); configState1 = new ClusterConfigState( 1L,