[ https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689175#comment-16689175 ]
ASF GitHub Bot commented on KAFKA-6774: --------------------------------------- hachikuji closed pull request #5877: KAFKA-6774: Improve the default group id behavior in KafkaConsumer (KIP-289) URL: https://github.com/apache/kafka/pull/5877 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 795a762a494..9cd5766ea3e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -277,7 +277,7 @@ ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), Importance.MEDIUM, CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC) - .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC) + .define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH, GROUP_ID_DOC) .define(SESSION_TIMEOUT_MS_CONFIG, Type.INT, 10000, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3a756721fd8..5c673a58c10 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -37,6 +37,8 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.JmxReporter; @@ -557,6 +559,7 @@ private final Logger log; private final String clientId; + private String groupId; private final ConsumerCoordinator coordinator; private final Deserializer<K> keyDeserializer; private final Deserializer<V> valueDeserializer; @@ -654,18 +657,23 @@ public KafkaConsumer(Properties properties, } @SuppressWarnings("unchecked") - private KafkaConsumer(ConsumerConfig config, - Deserializer<K> keyDeserializer, - Deserializer<V> valueDeserializer) { + private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { try { String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); if (clientId.isEmpty()) clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); this.clientId = clientId; - String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); - + this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] "); this.log = logContext.logger(getClass()); + boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + if (groupId == null) { // overwrite in case of default group id where the config is not explicitly provided + if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) + enableAutoCommit = false; + else if (enableAutoCommit) + throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used."); + } else if (groupId.isEmpty()) + log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release."); log.debug("Initializing the Kafka consumer"); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); @@ -678,8 +686,7 @@ private KafkaConsumer(ConsumerConfig config, .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricsTags); List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class, - Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); + MetricsReporter.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); @@ -691,16 +698,14 @@ private KafkaConsumer(ConsumerConfig config, ConsumerInterceptor.class); this.interceptors = new ConsumerInterceptors<>(interceptorList); if (keyDeserializer == null) { - this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - Deserializer.class); + this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); this.keyDeserializer.configure(config.originals(), true); } else { config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); this.keyDeserializer = keyDeserializer; } if (valueDeserializer == null) { - this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - Deserializer.class); + this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class); this.valueDeserializer.configure(config.originals(), false); } else { config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); @@ -710,17 +715,14 @@ private KafkaConsumer(ConsumerConfig config, this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), true, false, clusterResourceListeners); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( - config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), - config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)); + config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)); this.metadata.bootstrap(addresses, time.milliseconds()); String metricGrpPrefix = "consumer"; ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer"); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time); - IsolationLevel isolationLevel = IsolationLevel.valueOf( config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT)); Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics); - int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); NetworkClient netClient = new NetworkClient( @@ -755,24 +757,26 @@ private KafkaConsumer(ConsumerConfig config, int maxPollIntervalMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); - this.coordinator = new ConsumerCoordinator(logContext, - this.client, - groupId, - maxPollIntervalMs, - sessionTimeoutMs, - new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs), - assignors, - this.metadata, - this.subscriptions, - metrics, - metricGrpPrefix, - this.time, - retryBackoffMs, - config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), - config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), - this.interceptors, - config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG), - config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); + // no coordinator will be constructed for the default (null) group id + this.coordinator = groupId == null ? null : + new ConsumerCoordinator(logContext, + this.client, + groupId, + maxPollIntervalMs, + sessionTimeoutMs, + new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs), + assignors, + this.metadata, + this.subscriptions, + metrics, + metricGrpPrefix, + this.time, + retryBackoffMs, + enableAutoCommit, + config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), + this.interceptors, + config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG), + config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); this.fetcher = new Fetcher<>( logContext, this.client, @@ -795,11 +799,9 @@ private KafkaConsumer(ConsumerConfig config, config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); - log.debug("Kafka consumer initialized"); } catch (Throwable t) { - // call close methods if internal objects are already constructed - // this is to prevent resource leak. see KAFKA-2121 + // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121 close(0, true); // now propagate the exception throw new KafkaException("Failed to construct kafka consumer", t); @@ -822,7 +824,8 @@ private KafkaConsumer(ConsumerConfig config, long retryBackoffMs, long requestTimeoutMs, int defaultApiTimeoutMs, - List<PartitionAssignor> assignors) { + List<PartitionAssignor> assignors, + String groupId) { this.log = logContext.logger(getClass()); this.clientId = clientId; this.coordinator = coordinator; @@ -839,6 +842,7 @@ private KafkaConsumer(ConsumerConfig config, this.requestTimeoutMs = requestTimeoutMs; this.defaultApiTimeoutMs = defaultApiTimeoutMs; this.assignors = assignors; + this.groupId = groupId; } /** @@ -911,9 +915,10 @@ private KafkaConsumer(ConsumerConfig config, public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { acquireAndEnsureOpen(); try { - if (topics == null) { + maybeThrowInvalidGroupIdException(); + if (topics == null) throw new IllegalArgumentException("Topic collection to subscribe to cannot be null"); - } else if (topics.isEmpty()) { + if (topics.isEmpty()) { // treat subscribing to empty topic list as the same as unsubscribing this.unsubscribe(); } else { @@ -980,6 +985,7 @@ public void subscribe(Collection<String> topics) { */ @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); if (pattern == null) throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null"); @@ -1026,7 +1032,8 @@ public void unsubscribe() { try { fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet()); this.subscriptions.unsubscribe(); - this.coordinator.maybeLeaveGroup(); + if (this.coordinator != null) + this.coordinator.maybeLeaveGroup(); this.metadata.needMetadataForAllTopics(false); log.info("Unsubscribed all topics or patterns and assigned partitions"); } finally { @@ -1073,7 +1080,8 @@ public void assign(Collection<TopicPartition> partitions) { // make sure the offsets of topic partitions the consumer is unsubscribing from // are committed since there will be no following rebalance - this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); + if (coordinator != null) + this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); this.subscriptions.assignFromUser(new HashSet<>(partitions)); @@ -1211,7 +1219,7 @@ public void assign(Collection<TopicPartition> partitions) { * Visible for testing */ boolean updateAssignmentMetadataIfNeeded(final Timer timer) { - if (!coordinator.poll(timer)) { + if (coordinator != null && !coordinator.poll(timer)) { return false; } @@ -1219,7 +1227,8 @@ boolean updateAssignmentMetadataIfNeeded(final Timer timer) { } private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) { - long pollTimeout = Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); + long pollTimeout = coordinator == null ? timer.remainingMs() : + Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); // if data is available already, return it immediately final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); @@ -1249,7 +1258,7 @@ boolean updateAssignmentMetadataIfNeeded(final Timer timer) { // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster - if (coordinator.rejoinNeededOrPending()) { + if (coordinator != null && coordinator.rejoinNeededOrPending()) { return Collections.emptyMap(); } @@ -1324,6 +1333,7 @@ public void commitSync() { public void commitSync(Duration timeout) { acquireAndEnsureOpen(); try { + maybeThrowInvalidGroupIdException(); if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), time.timer(timeout))) { throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " + "committing the current consumed offsets"); @@ -1406,6 +1416,7 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) { public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) { acquireAndEnsureOpen(); try { + maybeThrowInvalidGroupIdException(); if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) { throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " + "committing offsets " + offsets); @@ -1475,6 +1486,7 @@ public void commitAsync(OffsetCommitCallback callback) { public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { acquireAndEnsureOpen(); try { + maybeThrowInvalidGroupIdException(); log.debug("Committing offsets: {}", offsets); coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback); } finally { @@ -1686,6 +1698,7 @@ public OffsetAndMetadata committed(TopicPartition partition) { public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) { acquireAndEnsureOpen(); try { + maybeThrowInvalidGroupIdException(); Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets( Collections.singleton(partition), time.timer(timeout)); if (offsets == null) { @@ -2163,7 +2176,7 @@ private boolean updateFetchPositions(final Timer timer) { // coordinator lookup if there are partitions which have missing positions, so // a consumer with manually assigned partitions can avoid a coordinator dependence // by always ensuring that assigned partitions have an initial position. - if (!coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false; + if (coordinator != null && !coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false; // If there are partitions still needing a position and a reset policy is defined, // request reset using the default policy. If no reset strategy is defined and there @@ -2216,6 +2229,12 @@ private void throwIfNoAssignorsConfigured() { ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property"); } + private void maybeThrowInvalidGroupIdException() { + if (groupId == null) + throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " + + "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration."); + } + // Visible for testing String getClientId() { return clientId; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index d9830877ba7..335e0f21f9f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -121,7 +121,7 @@ private Generation generation = Generation.NO_GENERATION; private RequestFuture<Void> findCoordinatorFuture = null; - + /** * Initialize the coordination manager. */ @@ -139,7 +139,8 @@ public AbstractCoordinator(LogContext logContext, this.log = logContext.logger(AbstractCoordinator.class); this.client = client; this.time = time; - this.groupId = groupId; + this.groupId = Objects.requireNonNull(groupId, + "Expected a non-null group id for coordinator construction"); this.rebalanceTimeoutMs = rebalanceTimeoutMs; this.sessionTimeoutMs = sessionTimeoutMs; this.leaveGroupOnClose = leaveGroupOnClose; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 987bad22219..3657077d033 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -35,6 +35,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; @@ -138,6 +140,8 @@ // a concurrent heartbeat request private final int autoCommitIntervalMs = 500; + private final String groupId = "mock-group"; + @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -203,7 +207,7 @@ public void testInvalidSocketReceiveBufferSize() { @Test public void testSubscription() { - KafkaConsumer<byte[], byte[]> consumer = newConsumer(); + KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId); consumer.subscribe(singletonList(topic)); assertEquals(singleton(topic), consumer.subscription()); @@ -226,21 +230,21 @@ public void testSubscription() { @Test(expected = IllegalArgumentException.class) public void testSubscriptionOnNullTopicCollection() { - try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) { consumer.subscribe((List<String>) null); } } @Test(expected = IllegalArgumentException.class) public void testSubscriptionOnNullTopic() { - try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) { consumer.subscribe(singletonList((String) null)); } } @Test(expected = IllegalArgumentException.class) public void testSubscriptionOnEmptyTopic() { - try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) { String emptyTopic = " "; consumer.subscribe(singletonList(emptyTopic)); } @@ -248,7 +252,7 @@ public void testSubscriptionOnEmptyTopic() { @Test(expected = IllegalArgumentException.class) public void testSubscriptionOnNullPattern() { - try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) { consumer.subscribe((Pattern) null); } } @@ -258,6 +262,7 @@ public void testSubscriptionWithEmptyPartitionAssignment() { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, ""); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(props)) { consumer.subscribe(singletonList(topic)); } @@ -265,7 +270,7 @@ public void testSubscriptionWithEmptyPartitionAssignment() { @Test(expected = IllegalArgumentException.class) public void testSeekNegative() { - try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) { consumer.assign(singleton(new TopicPartition("nonExistTopic", 0))); consumer.seek(new TopicPartition("nonExistTopic", 0), -1); } @@ -273,14 +278,14 @@ public void testSeekNegative() { @Test(expected = IllegalArgumentException.class) public void testAssignOnNullTopicPartition() { - try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) { consumer.assign(null); } } @Test public void testAssignOnEmptyTopicPartition() { - try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) { consumer.assign(Collections.<TopicPartition>emptyList()); assertTrue(consumer.subscription().isEmpty()); assertTrue(consumer.assignment().isEmpty()); @@ -289,14 +294,14 @@ public void testAssignOnEmptyTopicPartition() { @Test(expected = IllegalArgumentException.class) public void testAssignOnNullTopicInPartition() { - try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) { consumer.assign(singleton(new TopicPartition(null, 0))); } } @Test(expected = IllegalArgumentException.class) public void testAssignOnEmptyTopicInPartition() { - try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) { consumer.assign(singleton(new TopicPartition(" ", 0))); } } @@ -328,7 +333,7 @@ public void testInterceptorConstructorClose() throws Exception { @Test public void testPause() { - KafkaConsumer<byte[], byte[]> consumer = newConsumer(); + KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId); consumer.assign(singletonList(tp0)); assertEquals(singleton(tp0), consumer.assignment()); @@ -346,11 +351,19 @@ public void testPause() { consumer.close(); } - private KafkaConsumer<byte[], byte[]> newConsumer() { + private KafkaConsumer<byte[], byte[]> newConsumer(String groupId) { + return newConsumer(groupId, Optional.empty()); + } + + private KafkaConsumer<byte[], byte[]> newConsumer(String groupId, Optional<Boolean> enableAutoCommit) { Properties props = new Properties(); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my.consumer"); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + if (groupId != null) + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + if (enableAutoCommit.isPresent()) + props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit.get().toString()); return newConsumer(props); } @@ -554,7 +567,7 @@ public void testMissingOffsetNoResetPolicy() { PartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - OffsetResetStrategy.NONE, true); + OffsetResetStrategy.NONE, true, groupId); consumer.assign(singletonList(tp0)); client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); @@ -577,7 +590,7 @@ public void testResetToCommittedOffset() { PartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - OffsetResetStrategy.NONE, true); + OffsetResetStrategy.NONE, true, groupId); consumer.assign(singletonList(tp0)); client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); @@ -601,7 +614,7 @@ public void testResetUsingAutoResetPolicy() { PartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - OffsetResetStrategy.LATEST, true); + OffsetResetStrategy.LATEST, true, groupId); consumer.assign(singletonList(tp0)); client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); @@ -1210,14 +1223,14 @@ public void testOffsetOfPausedPartitions() { @Test(expected = IllegalStateException.class) public void testPollWithNoSubscription() { - try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) { consumer.poll(Duration.ZERO); } } @Test(expected = IllegalStateException.class) public void testPollWithEmptySubscription() { - try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) { consumer.subscribe(Collections.<String>emptyList()); consumer.poll(Duration.ZERO); } @@ -1225,7 +1238,7 @@ public void testPollWithEmptySubscription() { @Test(expected = IllegalStateException.class) public void testPollWithEmptyUserAssignment() { - try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) { consumer.assign(Collections.<TopicPartition>emptySet()); consumer.poll(Duration.ZERO); } @@ -1265,12 +1278,77 @@ public void testCloseInterrupt() throws Exception { @Test public void closeShouldBeIdempotent() { - KafkaConsumer<byte[], byte[]> consumer = newConsumer(); + KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null); consumer.close(); consumer.close(); consumer.close(); } + @Test + public void testOperationsBySubscribingConsumerWithDefaultGroupId() { + try { + newConsumer(null, Optional.of(Boolean.TRUE)); + fail("Expected an InvalidConfigurationException"); + } catch (KafkaException e) { + assertEquals(InvalidConfigurationException.class, e.getCause().getClass()); + } + + try { + newConsumer((String) null).subscribe(Collections.singleton(topic)); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + + try { + newConsumer((String) null).committed(tp0); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + + try { + newConsumer((String) null).commitAsync(); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + + try { + newConsumer((String) null).commitSync(); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + } + + @Test + public void testOperationsByAssigningConsumerWithDefaultGroupId() { + KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null); + consumer.assign(singleton(tp0)); + + try { + consumer.committed(tp0); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + + try { + consumer.commitAsync(); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + + try { + consumer.commitSync(); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + } + @Test public void testMetricConfigRecordingLevel() { Properties props = new Properties(); @@ -1681,13 +1759,20 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, Metadata metadata, PartitionAssignor assignor, boolean autoCommitEnabled) { - return newConsumer(time, client, metadata, assignor, OffsetResetStrategy.EARLIEST, autoCommitEnabled); + return newConsumer(time, client, metadata, assignor, OffsetResetStrategy.EARLIEST, autoCommitEnabled, groupId); } private KafkaConsumer<String, String> newConsumerNoAutoCommit(Time time, KafkaClient client, Metadata metadata) { - return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.EARLIEST, false); + return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.EARLIEST, false, groupId); + } + + private KafkaConsumer<String, String> newConsumer(Time time, + KafkaClient client, + Metadata metadata, + String groupId) { + return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.LATEST, true, groupId); } private KafkaConsumer<String, String> newConsumer(Time time, @@ -1695,9 +1780,9 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, Metadata metadata, PartitionAssignor assignor, OffsetResetStrategy resetStrategy, - boolean autoCommitEnabled) { + boolean autoCommitEnabled, + String groupId) { String clientId = "mock-consumer"; - String groupId = "mock-group"; String metricGroupPrefix = "consumer"; long retryBackoffMs = 100; int requestTimeoutMs = 30000; @@ -1782,7 +1867,8 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, retryBackoffMs, requestTimeoutMs, defaultApiTimeoutMs, - assignors); + assignors, + groupId); } private static class FetchInfo { diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java index 938fe948b1f..61606ab8e89 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java @@ -84,6 +84,7 @@ public void teardown() throws Exception { public void testConsumerWithInvalidCredentials() { Map<String, Object> props = new HashMap<>(saslClientConfigs); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, ""); StringDeserializer deserializer = new StringDeserializer(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, deserializer, deserializer)) { diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 0e2797a5de1..5a200055742 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -113,10 +113,12 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { def createConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer, valueDeserializer: Deserializer[V] = new ByteArrayDeserializer, - configOverrides: Properties = new Properties): KafkaConsumer[K, V] = { + configOverrides: Properties = new Properties, + configsToRemove: List[String] = List()): KafkaConsumer[K, V] = { val props = new Properties props ++= consumerConfig props ++= configOverrides + configsToRemove.foreach(props.remove(_)) val consumer = new KafkaConsumer[K, V](props, keyDeserializer, valueDeserializer) consumers += consumer consumer diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index c06a796040e..c11fc12d16d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -14,6 +14,7 @@ package kafka.api import java.time.Duration import java.util +import java.util.Arrays.asList import java.util.regex.Pattern import java.util.{Collections, Locale, Optional, Properties} @@ -23,7 +24,7 @@ import kafka.utils.TestUtils import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.{MetricName, TopicPartition} -import org.apache.kafka.common.errors.InvalidTopicException +import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException} import org.apache.kafka.common.header.Headers import org.apache.kafka.common.record.{CompressionType, TimestampType} import org.apache.kafka.common.serialization._ @@ -1814,4 +1815,129 @@ class PlaintextConsumerTest extends BaseConsumerTest { s"The current assignment is ${consumer.assignment()}") } + @Test + def testConsumingWithNullGroupId(): Unit = { + val topic = "test_topic" + val partition = 0; + val tp = new TopicPartition(topic, partition) + createTopic(topic, 1, 1) + + TestUtils.waitUntilTrue(() => { + this.zkClient.topicExists(topic) + }, "Failed to create topic") + + val producer = createProducer() + producer.send(new ProducerRecord(topic, partition, "k1".getBytes, "v1".getBytes)).get() + producer.send(new ProducerRecord(topic, partition, "k2".getBytes, "v2".getBytes)).get() + producer.send(new ProducerRecord(topic, partition, "k3".getBytes, "v3".getBytes)).get() + producer.close() + + // consumer 1 uses the default group id and consumes from earliest offset + val consumer1Config = new Properties(consumerConfig) + consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1") + val consumer1 = createConsumer( + configOverrides = consumer1Config, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + + // consumer 2 uses the default group id and consumes from latest offset + val consumer2Config = new Properties(consumerConfig) + consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") + consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2") + val consumer2 = createConsumer( + configOverrides = consumer2Config, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + + // consumer 3 uses the default group id and starts from an explicit offset + val consumer3Config = new Properties(consumerConfig) + consumer3Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer3") + val consumer3 = createConsumer( + configOverrides = consumer3Config, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + + consumer1.assign(asList(tp)) + consumer2.assign(asList(tp)) + consumer3.assign(asList(tp)) + consumer3.seek(tp, 1) + + val numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count() + + try { + consumer1.commitSync() + fail("Expected offset commit to fail due to null group id") + } catch { + case e: InvalidGroupIdException => // OK + } + + try { + consumer2.committed(tp) + fail("Expected committed offset fetch to fail due to null group id") + } catch { + case e: InvalidGroupIdException => // OK + } + + val numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count() + val numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count() + + consumer1.unsubscribe() + consumer2.unsubscribe() + consumer3.unsubscribe() + + consumer1.close() + consumer2.close() + consumer3.close() + + assertEquals("Expected consumer1 to consume from earliest offset", 3, numRecords1) + assertEquals("Expected consumer2 to consume from latest offset", 0, numRecords2) + assertEquals("Expected consumer3 to consume from offset 1", 2, numRecords3) + } + + @Test + def testConsumingWithEmptyGroupId(): Unit = { + val topic = "test_topic" + val partition = 0; + val tp = new TopicPartition(topic, partition) + createTopic(topic, 1, 1) + + TestUtils.waitUntilTrue(() => { + this.zkClient.topicExists(topic) + }, "Failed to create topic") + + val producer = createProducer() + producer.send(new ProducerRecord(topic, partition, "k1".getBytes, "v1".getBytes)).get() + producer.send(new ProducerRecord(topic, partition, "k2".getBytes, "v2".getBytes)).get() + producer.close() + + // consumer 1 uses the empty group id + val consumer1Config = new Properties(consumerConfig) + consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "") + consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1") + consumer1Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1") + val consumer1 = createConsumer(configOverrides = consumer1Config) + + // consumer 2 uses the empty group id and consumes from latest offset if there is no committed offset + val consumer2Config = new Properties(consumerConfig) + consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") + consumer2Config.put(ConsumerConfig.GROUP_ID_CONFIG, "") + consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2") + consumer2Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1") + val consumer2 = createConsumer(configOverrides = consumer2Config) + + consumer1.assign(asList(tp)) + consumer2.assign(asList(tp)) + + val records1 = consumer1.poll(Duration.ofMillis(5000)) + consumer1.commitSync() + + val records2 = consumer2.poll(Duration.ofMillis(5000)) + consumer2.commitSync() + + consumer1.close() + consumer2.close() + + assertTrue("Expected consumer1 to consume one message from offset 0", + records1.count() == 1 && records1.records(tp).asScala.head.offset == 0) + assertTrue("Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1", + records2.count() == 1 && records2.records(tp).asScala.head.offset == 1) + } } diff --git a/docs/upgrade.html b/docs/upgrade.html index 33d9964113a..154547b10a0 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -19,6 +19,15 @@ <script id="upgrade-template" type="text/x-handlebars-template"> +<h4><a id="upgrade_2_2_0" href="#upgrade_2_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x to 2.2.0</a></h4> +<h5><a id="upgrade_220_notable" href="#upgrade_220_notable">Notable changes in 2.2.0</a></h5> +<ul> + <li>The default consumer group id has been changed from the empty string (<code>""</code>) to <code>null</code>. Consumers who use the new default group id will not be able to subscribe to topics, + and fetch or commit offsets. The empty string as consumer group id is deprecated but will be supported until a future major release. Old clients that rely on the empty string group id will now + have to explicitly provide it as part of their consumer config. For more information see + <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer">KIP-289</a>.</li> +</ul> + <h4><a id="upgrade_2_1_0" href="#upgrade_2_1_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 2.0.0 to 2.1.0</a></h4> <p><b>Note that 2.1.x contains a change to the internal schema used to store consumer offsets. Once the upgrade is ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve default groupId behavior in consumer > -------------------------------------------- > > Key: KAFKA-6774 > URL: https://issues.apache.org/jira/browse/KAFKA-6774 > Project: Kafka > Issue Type: Improvement > Components: consumer > Reporter: Jason Gustafson > Assignee: Vahid Hashemian > Priority: Major > Labels: needs-kip > Fix For: 2.2.0 > > > At the moment, the default groupId in the consumer is "". If you try to use > this to subscribe() to a topic, the broker will reject the group as invalid. > On the other hand, if you use it with assign(), then the user will be able to > fetch and commit offsets using the empty groupId. Probably 99% of the time, > this is not what the user expects. Instead you would probably expect that if > no groupId is provided, then no committed offsets will be fetched at all and > we'll just use the auto reset behavior if we don't have a current position. > Here are two potential solutions (both requiring a KIP): > 1. Change the default to null. We will preserve the current behavior for > subscribe(). When using assign(), we will not bother fetching committed > offsets for the null groupId, and any attempt to commit offsets will raise an > error. The user can still use the empty groupId, but they have to specify it > explicitly. > 2. Keep the current default, but change the consumer to treat this value as > if it were null as described in option 1. The argument for this behavior is > that using the empty groupId to commit offsets is inherently a dangerous > practice and should not be permitted. We'd have to convince ourselves that > we're fine not needing to allow the empty groupId for backwards compatibility > though. -- This message was sent by Atlassian JIRA (v7.6.3#76005)