This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new 8dee2fe KAFKA-8399: bring back internal.leave.group.on.close config for KStream (#6779) 8dee2fe is described below commit 8dee2fe1f59bb96840e67beb0493c8254521a67f Author: Boyang Chen <boy...@confluent.io> AuthorDate: Wed May 22 08:03:00 2019 -0700 KAFKA-8399: bring back internal.leave.group.on.close config for KStream (#6779) As title states. We plan to merge this to both trunk and 2.3 if it could fix the stream system tests globally. Reference implementation: #6673 Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax <mj...@apache.org> --- .../org/apache/kafka/clients/consumer/ConsumerConfig.java | 15 +++++++++++++++ .../org/apache/kafka/clients/consumer/KafkaConsumer.java | 3 ++- .../clients/consumer/internals/AbstractCoordinator.java | 14 ++++++++++---- .../clients/consumer/internals/ConsumerCoordinator.java | 6 ++++-- .../apache/kafka/clients/consumer/KafkaConsumerTest.java | 3 ++- .../consumer/internals/AbstractCoordinatorTest.java | 2 +- .../consumer/internals/ConsumerCoordinatorTest.java | 3 ++- .../connect/runtime/distributed/WorkerCoordinator.java | 3 ++- .../main/java/org/apache/kafka/streams/StreamsConfig.java | 1 + .../java/org/apache/kafka/streams/StreamsConfigTest.java | 8 ++++++++ 10 files changed, 47 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index ff2e5cd..010fff8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -254,6 +254,17 @@ public class ConsumerConfig extends AbstractConfig { "be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic."; public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true; + /** + * <code>internal.leave.group.on.close</code> + * Whether or not the consumer should leave the group on close. If set to <code>false</code> then a rebalance + * won't occur until <code>session.timeout.ms</code> expires. + * + * <p> + * Note: this is an internal configuration and could be changed in the future in a backward incompatible way + * + */ + static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close"; + /** <code>isolation.level</code> */ public static final String ISOLATION_LEVEL_CONFIG = "isolation.level"; public static final String ISOLATION_LEVEL_DOC = "<p>Controls how to read messages written transactionally. If set to <code>read_committed</code>, consumer.poll() will only return" + @@ -476,6 +487,10 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_EXCLUDE_INTERNAL_TOPICS, Importance.MEDIUM, EXCLUDE_INTERNAL_TOPICS_DOC) + .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG, + Type.BOOLEAN, + true, + Importance.LOW) .define(ISOLATION_LEVEL_CONFIG, Type.STRING, DEFAULT_ISOLATION_LEVEL, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index ad7ae82..c33a52e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -792,7 +792,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { retryBackoffMs, enableAutoCommit, config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), - this.interceptors); + this.interceptors, + config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); this.fetcher = new Fetcher<>( logContext, this.client, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 3af6d05..54678f7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -133,6 +133,7 @@ public abstract class AbstractCoordinator implements Closeable { private Generation generation = Generation.NO_GENERATION; private RequestFuture<Void> findCoordinatorFuture = null; + private final boolean leaveGroupOnClose; /** * Initialize the coordination manager. @@ -147,7 +148,8 @@ public abstract class AbstractCoordinator implements Closeable { Metrics metrics, String metricGrpPrefix, Time time, - long retryBackoffMs) { + long retryBackoffMs, + boolean leaveGroupOnClose) { this.log = logContext.logger(AbstractCoordinator.class); this.client = client; this.time = time; @@ -159,6 +161,7 @@ public abstract class AbstractCoordinator implements Closeable { this.heartbeat = heartbeat; this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix); this.retryBackoffMs = retryBackoffMs; + this.leaveGroupOnClose = leaveGroupOnClose; } public AbstractCoordinator(LogContext logContext, @@ -171,10 +174,11 @@ public abstract class AbstractCoordinator implements Closeable { Metrics metrics, String metricGrpPrefix, Time time, - long retryBackoffMs) { + long retryBackoffMs, + boolean leaveGroupOnClose) { this(logContext, client, groupId, groupInstanceId, rebalanceTimeoutMs, sessionTimeoutMs, new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs), - metrics, metricGrpPrefix, time, retryBackoffMs); + metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose); } /** @@ -845,7 +849,9 @@ public abstract class AbstractCoordinator implements Closeable { // Synchronize after closing the heartbeat thread since heartbeat thread // needs this lock to complete and terminate after close flag is set. synchronized (this) { - maybeLeaveGroup(); + if (leaveGroupOnClose) { + maybeLeaveGroup(); + } // At this point, there may be pending commits (async commits or sync commits that were // interrupted using wakeup) and the leave group request which have been queued, but not diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 3aef0c5..b03af74 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -137,7 +137,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { long retryBackoffMs, boolean autoCommitEnabled, int autoCommitIntervalMs, - ConsumerInterceptors<?, ?> interceptors) { + ConsumerInterceptors<?, ?> interceptors, + boolean leaveGroupOnClose) { super(logContext, client, groupId, @@ -148,7 +149,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { metrics, metricGrpPrefix, time, - retryBackoffMs); + retryBackoffMs, + leaveGroupOnClose); this.log = logContext.logger(ConsumerCoordinator.class); this.metadata = metadata; this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 9012ea2..42cccd4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1902,7 +1902,8 @@ public class KafkaConsumerTest { retryBackoffMs, autoCommitEnabled, autoCommitIntervalMs, - interceptors); + interceptors, + true); Fetcher<String, String> fetcher = new Fetcher<>( loggerFactory, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 31328b3..0fc5f62 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -857,7 +857,7 @@ public class AbstractCoordinatorTest { int retryBackoffMs, Optional<String> groupInstanceId) { super(new LogContext(), client, GROUP_ID, groupInstanceId, rebalanceTimeoutMs, - SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs); + SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, !groupInstanceId.isPresent()); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index f0214d2..86032c4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -2219,7 +2219,8 @@ public class ConsumerCoordinatorTest { retryBackoffMs, autoCommitEnabled, autoCommitIntervalMs, - null + null, + !groupInstanceId.isPresent() ); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 706742a..fd7c7a4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -95,7 +95,8 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable metrics, metricGrpPrefix, time, - retryBackoffMs); + retryBackoffMs, + true); this.log = logContext.logger(WorkerCoordinator.class); this.restUrl = restUrl; this.configStorage = configStorage; diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 5024c28..6d93b99 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -720,6 +720,7 @@ public class StreamsConfig extends AbstractConfig { tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index c202c93..5f053bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; +import org.hamcrest.CoreMatchers; import org.junit.Before; import org.junit.Test; @@ -425,6 +426,13 @@ public class StreamsConfigTest { } @Test + public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() { + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); + assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.equalTo(false)); + } + + @Test public void shouldAcceptAtLeastOnce() { // don't use `StreamsConfig.AT_LEAST_ONCE` to actually do a useful test props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "at_least_once");