[ 
https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689175#comment-16689175
 ] 

ASF GitHub Bot commented on KAFKA-6774:
---------------------------------------

hachikuji closed pull request #5877: KAFKA-6774: Improve the default group id 
behavior in KafkaConsumer (KIP-289)
URL: https://github.com/apache/kafka/pull/5877
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 795a762a494..9cd5766ea3e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -277,7 +277,7 @@
                                            
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
                                         Importance.MEDIUM,
                                         
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
-                                .define(GROUP_ID_CONFIG, Type.STRING, "", 
Importance.HIGH, GROUP_ID_DOC)
+                                .define(GROUP_ID_CONFIG, Type.STRING, null, 
Importance.HIGH, GROUP_ID_DOC)
                                 .define(SESSION_TIMEOUT_MS_CONFIG,
                                         Type.INT,
                                         10000,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3a756721fd8..5c673a58c10 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -37,6 +37,8 @@
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -557,6 +559,7 @@
 
     private final Logger log;
     private final String clientId;
+    private String groupId;
     private final ConsumerCoordinator coordinator;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
@@ -654,18 +657,23 @@ public KafkaConsumer(Properties properties,
     }
 
     @SuppressWarnings("unchecked")
-    private KafkaConsumer(ConsumerConfig config,
-                          Deserializer<K> keyDeserializer,
-                          Deserializer<V> valueDeserializer) {
+    private KafkaConsumer(ConsumerConfig config, Deserializer<K> 
keyDeserializer, Deserializer<V> valueDeserializer) {
         try {
             String clientId = 
config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
             if (clientId.isEmpty())
                 clientId = "consumer-" + 
CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
             this.clientId = clientId;
-            String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
-
+            this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
             LogContext logContext = new LogContext("[Consumer clientId=" + 
clientId + ", groupId=" + groupId + "] ");
             this.log = logContext.logger(getClass());
+            boolean enableAutoCommit = 
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+            if (groupId == null) { // overwrite in case of default group id 
where the config is not explicitly provided
+                if 
(!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
+                    enableAutoCommit = false;
+                else if (enableAutoCommit)
+                    throw new 
InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " 
cannot be set to true when default group id (null) is used.");
+            } else if (groupId.isEmpty())
+                log.warn("Support for using the empty group id by consumers is 
deprecated and will be removed in the next major release.");
 
             log.debug("Initializing the Kafka consumer");
             this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
@@ -678,8 +686,7 @@ private KafkaConsumer(ConsumerConfig config,
                     
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                     .tags(metricsTags);
             List<MetricsReporter> reporters = 
config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                    MetricsReporter.class,
-                    Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId));
+                    MetricsReporter.class, 
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
@@ -691,16 +698,14 @@ private KafkaConsumer(ConsumerConfig config,
                     ConsumerInterceptor.class);
             this.interceptors = new ConsumerInterceptors<>(interceptorList);
             if (keyDeserializer == null) {
-                this.keyDeserializer = 
config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                        Deserializer.class);
+                this.keyDeserializer = 
config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
Deserializer.class);
                 this.keyDeserializer.configure(config.originals(), true);
             } else {
                 config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
                 this.keyDeserializer = keyDeserializer;
             }
             if (valueDeserializer == null) {
-                this.valueDeserializer = 
config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                        Deserializer.class);
+                this.valueDeserializer = 
config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
Deserializer.class);
                 this.valueDeserializer.configure(config.originals(), false);
             } else {
                 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
@@ -710,17 +715,14 @@ private KafkaConsumer(ConsumerConfig config,
             this.metadata = new Metadata(retryBackoffMs, 
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
                     true, false, clusterResourceListeners);
             List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(
-                    config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
-                    config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
+                    config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), 
config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
             this.metadata.bootstrap(addresses, time.milliseconds());
             String metricGrpPrefix = "consumer";
             ConsumerMetrics metricsRegistry = new 
ConsumerMetrics(metricsTags.keySet(), "consumer");
             ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config, time);
-
             IsolationLevel isolationLevel = IsolationLevel.valueOf(
                     
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
             Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, 
metricsRegistry.fetcherMetrics);
-
             int heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
 
             NetworkClient netClient = new NetworkClient(
@@ -755,24 +757,26 @@ private KafkaConsumer(ConsumerConfig config,
 
             int maxPollIntervalMs = 
config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
             int sessionTimeoutMs = 
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
-            this.coordinator = new ConsumerCoordinator(logContext,
-                    this.client,
-                    groupId,
-                    maxPollIntervalMs,
-                    sessionTimeoutMs,
-                    new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, 
maxPollIntervalMs, retryBackoffMs),
-                    assignors,
-                    this.metadata,
-                    this.subscriptions,
-                    metrics,
-                    metricGrpPrefix,
-                    this.time,
-                    retryBackoffMs,
-                    
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
-                    
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-                    this.interceptors,
-                    
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
-                    
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
+            // no coordinator will be constructed for the default (null) group 
id
+            this.coordinator = groupId == null ? null :
+                new ConsumerCoordinator(logContext,
+                        this.client,
+                        groupId,
+                        maxPollIntervalMs,
+                        sessionTimeoutMs,
+                        new Heartbeat(time, sessionTimeoutMs, 
heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
+                        assignors,
+                        this.metadata,
+                        this.subscriptions,
+                        metrics,
+                        metricGrpPrefix,
+                        this.time,
+                        retryBackoffMs,
+                        enableAutoCommit,
+                        
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
+                        this.interceptors,
+                        
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
+                        
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
             this.fetcher = new Fetcher<>(
                     logContext,
                     this.client,
@@ -795,11 +799,9 @@ private KafkaConsumer(ConsumerConfig config,
 
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
-
             log.debug("Kafka consumer initialized");
         } catch (Throwable t) {
-            // call close methods if internal objects are already constructed
-            // this is to prevent resource leak. see KAFKA-2121
+            // call close methods if internal objects are already constructed; 
this is to prevent resource leak. see KAFKA-2121
             close(0, true);
             // now propagate the exception
             throw new KafkaException("Failed to construct kafka consumer", t);
@@ -822,7 +824,8 @@ private KafkaConsumer(ConsumerConfig config,
                   long retryBackoffMs,
                   long requestTimeoutMs,
                   int defaultApiTimeoutMs,
-                  List<PartitionAssignor> assignors) {
+                  List<PartitionAssignor> assignors,
+                  String groupId) {
         this.log = logContext.logger(getClass());
         this.clientId = clientId;
         this.coordinator = coordinator;
@@ -839,6 +842,7 @@ private KafkaConsumer(ConsumerConfig config,
         this.requestTimeoutMs = requestTimeoutMs;
         this.defaultApiTimeoutMs = defaultApiTimeoutMs;
         this.assignors = assignors;
+        this.groupId = groupId;
     }
 
     /**
@@ -911,9 +915,10 @@ private KafkaConsumer(ConsumerConfig config,
     public void subscribe(Collection<String> topics, ConsumerRebalanceListener 
listener) {
         acquireAndEnsureOpen();
         try {
-            if (topics == null) {
+            maybeThrowInvalidGroupIdException();
+            if (topics == null)
                 throw new IllegalArgumentException("Topic collection to 
subscribe to cannot be null");
-            } else if (topics.isEmpty()) {
+            if (topics.isEmpty()) {
                 // treat subscribing to empty topic list as the same as 
unsubscribing
                 this.unsubscribe();
             } else {
@@ -980,6 +985,7 @@ public void subscribe(Collection<String> topics) {
      */
     @Override
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+        maybeThrowInvalidGroupIdException();
         if (pattern == null)
             throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be null");
 
@@ -1026,7 +1032,8 @@ public void unsubscribe() {
         try {
             
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
             this.subscriptions.unsubscribe();
-            this.coordinator.maybeLeaveGroup();
+            if (this.coordinator != null)
+                this.coordinator.maybeLeaveGroup();
             this.metadata.needMetadataForAllTopics(false);
             log.info("Unsubscribed all topics or patterns and assigned 
partitions");
         } finally {
@@ -1073,7 +1080,8 @@ public void assign(Collection<TopicPartition> partitions) 
{
 
                 // make sure the offsets of topic partitions the consumer is 
unsubscribing from
                 // are committed since there will be no following rebalance
-                
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
+                if (coordinator != null)
+                    
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
 
                 log.debug("Subscribed to partition(s): {}", 
Utils.join(partitions, ", "));
                 this.subscriptions.assignFromUser(new HashSet<>(partitions));
@@ -1211,7 +1219,7 @@ public void assign(Collection<TopicPartition> partitions) 
{
      * Visible for testing
      */
     boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
-        if (!coordinator.poll(timer)) {
+        if (coordinator != null && !coordinator.poll(timer)) {
             return false;
         }
 
@@ -1219,7 +1227,8 @@ boolean updateAssignmentMetadataIfNeeded(final Timer 
timer) {
     }
 
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> 
pollForFetches(Timer timer) {
-        long pollTimeout = 
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
timer.remainingMs());
+        long pollTimeout = coordinator == null ? timer.remainingMs() :
+                Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
timer.remainingMs());
 
         // if data is available already, return it immediately
         final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = 
fetcher.fetchedRecords();
@@ -1249,7 +1258,7 @@ boolean updateAssignmentMetadataIfNeeded(final Timer 
timer) {
 
         // after the long poll, we should check whether the group needs to 
rebalance
         // prior to returning data so that the group can stabilize faster
-        if (coordinator.rejoinNeededOrPending()) {
+        if (coordinator != null && coordinator.rejoinNeededOrPending()) {
             return Collections.emptyMap();
         }
 
@@ -1324,6 +1333,7 @@ public void commitSync() {
     public void commitSync(Duration timeout) {
         acquireAndEnsureOpen();
         try {
+            maybeThrowInvalidGroupIdException();
             if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), 
time.timer(timeout))) {
                 throw new TimeoutException("Timeout of " + timeout.toMillis() 
+ "ms expired before successfully " +
                         "committing the current consumed offsets");
@@ -1406,6 +1416,7 @@ public void commitSync(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> 
offsets, final Duration timeout) {
         acquireAndEnsureOpen();
         try {
+            maybeThrowInvalidGroupIdException();
             if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), 
time.timer(timeout))) {
                 throw new TimeoutException("Timeout of " + timeout.toMillis() 
+ "ms expired before successfully " +
                         "committing offsets " + offsets);
@@ -1475,6 +1486,7 @@ public void commitAsync(OffsetCommitCallback callback) {
     public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> 
offsets, OffsetCommitCallback callback) {
         acquireAndEnsureOpen();
         try {
+            maybeThrowInvalidGroupIdException();
             log.debug("Committing offsets: {}", offsets);
             coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
         } finally {
@@ -1686,6 +1698,7 @@ public OffsetAndMetadata committed(TopicPartition 
partition) {
     public OffsetAndMetadata committed(TopicPartition partition, final 
Duration timeout) {
         acquireAndEnsureOpen();
         try {
+            maybeThrowInvalidGroupIdException();
             Map<TopicPartition, OffsetAndMetadata> offsets = 
coordinator.fetchCommittedOffsets(
                     Collections.singleton(partition), time.timer(timeout));
             if (offsets == null) {
@@ -2163,7 +2176,7 @@ private boolean updateFetchPositions(final Timer timer) {
         // coordinator lookup if there are partitions which have missing 
positions, so
         // a consumer with manually assigned partitions can avoid a 
coordinator dependence
         // by always ensuring that assigned partitions have an initial 
position.
-        if (!coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false;
+        if (coordinator != null && 
!coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false;
 
         // If there are partitions still needing a position and a reset policy 
is defined,
         // request reset using the default policy. If no reset strategy is 
defined and there
@@ -2216,6 +2229,12 @@ private void throwIfNoAssignorsConfigured() {
                 ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " 
configuration property");
     }
 
+    private void maybeThrowInvalidGroupIdException() {
+        if (groupId == null)
+            throw new InvalidGroupIdException("To use the group management or 
offset commit APIs, you must " +
+                    "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in 
the consumer configuration.");
+    }
+
     // Visible for testing
     String getClientId() {
         return clientId;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d9830877ba7..335e0f21f9f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -121,7 +121,7 @@
     private Generation generation = Generation.NO_GENERATION;
 
     private RequestFuture<Void> findCoordinatorFuture = null;
-    
+
     /**
      * Initialize the coordination manager.
      */
@@ -139,7 +139,8 @@ public AbstractCoordinator(LogContext logContext,
         this.log = logContext.logger(AbstractCoordinator.class);
         this.client = client;
         this.time = time;
-        this.groupId = groupId;
+        this.groupId = Objects.requireNonNull(groupId,
+                "Expected a non-null group id for coordinator construction");
         this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.leaveGroupOnClose = leaveGroupOnClose;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 987bad22219..3657077d033 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -35,6 +35,8 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
@@ -138,6 +140,8 @@
     // a concurrent heartbeat request
     private final int autoCommitIntervalMs = 500;
 
+    private final String groupId = "mock-group";
+
     @Rule
     public ExpectedException expectedException = ExpectedException.none();
 
@@ -203,7 +207,7 @@ public void testInvalidSocketReceiveBufferSize() {
 
     @Test
     public void testSubscription() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId);
 
         consumer.subscribe(singletonList(topic));
         assertEquals(singleton(topic), consumer.subscription());
@@ -226,21 +230,21 @@ public void testSubscription() {
 
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnNullTopicCollection() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             consumer.subscribe((List<String>) null);
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnNullTopic() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             consumer.subscribe(singletonList((String) null));
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnEmptyTopic() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             String emptyTopic = "  ";
             consumer.subscribe(singletonList(emptyTopic));
         }
@@ -248,7 +252,7 @@ public void testSubscriptionOnEmptyTopic() {
 
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnNullPattern() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             consumer.subscribe((Pattern) null);
         }
     }
@@ -258,6 +262,7 @@ public void testSubscriptionWithEmptyPartitionAssignment() {
         Properties props = new Properties();
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
         props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
"");
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(props)) {
             consumer.subscribe(singletonList(topic));
         }
@@ -265,7 +270,7 @@ public void testSubscriptionWithEmptyPartitionAssignment() {
 
     @Test(expected = IllegalArgumentException.class)
     public void testSeekNegative() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) 
null)) {
             consumer.assign(singleton(new TopicPartition("nonExistTopic", 0)));
             consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
         }
@@ -273,14 +278,14 @@ public void testSeekNegative() {
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnNullTopicPartition() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) 
null)) {
             consumer.assign(null);
         }
     }
 
     @Test
     public void testAssignOnEmptyTopicPartition() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             consumer.assign(Collections.<TopicPartition>emptyList());
             assertTrue(consumer.subscription().isEmpty());
             assertTrue(consumer.assignment().isEmpty());
@@ -289,14 +294,14 @@ public void testAssignOnEmptyTopicPartition() {
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnNullTopicInPartition() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) 
null)) {
             consumer.assign(singleton(new TopicPartition(null, 0)));
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnEmptyTopicInPartition() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) 
null)) {
             consumer.assign(singleton(new TopicPartition("  ", 0)));
         }
     }
@@ -328,7 +333,7 @@ public void testInterceptorConstructorClose() throws 
Exception {
 
     @Test
     public void testPause() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId);
 
         consumer.assign(singletonList(tp0));
         assertEquals(singleton(tp0), consumer.assignment());
@@ -346,11 +351,19 @@ public void testPause() {
         consumer.close();
     }
 
-    private KafkaConsumer<byte[], byte[]> newConsumer() {
+    private KafkaConsumer<byte[], byte[]> newConsumer(String groupId) {
+        return newConsumer(groupId, Optional.empty());
+    }
+
+    private KafkaConsumer<byte[], byte[]> newConsumer(String groupId, 
Optional<Boolean> enableAutoCommit) {
         Properties props = new Properties();
         props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my.consumer");
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
+        if (groupId != null)
+            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        if (enableAutoCommit.isPresent())
+            props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
enableAutoCommit.get().toString());
         return newConsumer(props);
     }
 
@@ -554,7 +567,7 @@ public void testMissingOffsetNoResetPolicy() {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
metadata, assignor,
-                OffsetResetStrategy.NONE, true);
+                OffsetResetStrategy.NONE, true, groupId);
         consumer.assign(singletonList(tp0));
 
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, 
node), node);
@@ -577,7 +590,7 @@ public void testResetToCommittedOffset() {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
metadata, assignor,
-                OffsetResetStrategy.NONE, true);
+                OffsetResetStrategy.NONE, true, groupId);
         consumer.assign(singletonList(tp0));
 
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, 
node), node);
@@ -601,7 +614,7 @@ public void testResetUsingAutoResetPolicy() {
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
metadata, assignor,
-                OffsetResetStrategy.LATEST, true);
+                OffsetResetStrategy.LATEST, true, groupId);
         consumer.assign(singletonList(tp0));
 
         client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, 
node), node);
@@ -1210,14 +1223,14 @@ public void testOffsetOfPausedPartitions() {
 
     @Test(expected = IllegalStateException.class)
     public void testPollWithNoSubscription() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) 
null)) {
             consumer.poll(Duration.ZERO);
         }
     }
 
     @Test(expected = IllegalStateException.class)
     public void testPollWithEmptySubscription() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             consumer.subscribe(Collections.<String>emptyList());
             consumer.poll(Duration.ZERO);
         }
@@ -1225,7 +1238,7 @@ public void testPollWithEmptySubscription() {
 
     @Test(expected = IllegalStateException.class)
     public void testPollWithEmptyUserAssignment() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
             consumer.assign(Collections.<TopicPartition>emptySet());
             consumer.poll(Duration.ZERO);
         }
@@ -1265,12 +1278,77 @@ public void testCloseInterrupt() throws Exception {
 
     @Test
     public void closeShouldBeIdempotent() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
         consumer.close();
         consumer.close();
         consumer.close();
     }
 
+    @Test
+    public void testOperationsBySubscribingConsumerWithDefaultGroupId() {
+        try {
+            newConsumer(null, Optional.of(Boolean.TRUE));
+            fail("Expected an InvalidConfigurationException");
+        } catch (KafkaException e) {
+            assertEquals(InvalidConfigurationException.class, 
e.getCause().getClass());
+        }
+
+        try {
+            newConsumer((String) null).subscribe(Collections.singleton(topic));
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+
+        try {
+            newConsumer((String) null).committed(tp0);
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+
+        try {
+            newConsumer((String) null).commitAsync();
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+
+        try {
+            newConsumer((String) null).commitSync();
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+    }
+
+    @Test
+    public void testOperationsByAssigningConsumerWithDefaultGroupId() {
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
+        consumer.assign(singleton(tp0));
+
+        try {
+            consumer.committed(tp0);
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+
+        try {
+            consumer.commitAsync();
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+
+        try {
+            consumer.commitSync();
+            fail("Expected an InvalidGroupIdException");
+        } catch (InvalidGroupIdException e) {
+            // OK, expected
+        }
+    }
+
     @Test
     public void testMetricConfigRecordingLevel() {
         Properties props = new Properties();
@@ -1681,13 +1759,20 @@ private FetchResponse fetchResponse(TopicPartition 
partition, long fetchOffset,
                                                       Metadata metadata,
                                                       PartitionAssignor 
assignor,
                                                       boolean 
autoCommitEnabled) {
-        return newConsumer(time, client, metadata, assignor, 
OffsetResetStrategy.EARLIEST, autoCommitEnabled);
+        return newConsumer(time, client, metadata, assignor, 
OffsetResetStrategy.EARLIEST, autoCommitEnabled, groupId);
     }
 
     private KafkaConsumer<String, String> newConsumerNoAutoCommit(Time time,
                                                                   KafkaClient 
client,
                                                                   Metadata 
metadata) {
-        return newConsumer(time, client, metadata, new RangeAssignor(), 
OffsetResetStrategy.EARLIEST, false);
+        return newConsumer(time, client, metadata, new RangeAssignor(), 
OffsetResetStrategy.EARLIEST, false, groupId);
+    }
+
+    private KafkaConsumer<String, String> newConsumer(Time time,
+                                                      KafkaClient client,
+                                                      Metadata metadata,
+                                                      String groupId) {
+        return newConsumer(time, client, metadata, new RangeAssignor(), 
OffsetResetStrategy.LATEST, true, groupId);
     }
 
     private KafkaConsumer<String, String> newConsumer(Time time,
@@ -1695,9 +1780,9 @@ private FetchResponse fetchResponse(TopicPartition 
partition, long fetchOffset,
                                                       Metadata metadata,
                                                       PartitionAssignor 
assignor,
                                                       OffsetResetStrategy 
resetStrategy,
-                                                      boolean 
autoCommitEnabled) {
+                                                      boolean 
autoCommitEnabled,
+                                                      String groupId) {
         String clientId = "mock-consumer";
-        String groupId = "mock-group";
         String metricGroupPrefix = "consumer";
         long retryBackoffMs = 100;
         int requestTimeoutMs = 30000;
@@ -1782,7 +1867,8 @@ private FetchResponse fetchResponse(TopicPartition 
partition, long fetchOffset,
                 retryBackoffMs,
                 requestTimeoutMs,
                 defaultApiTimeoutMs,
-                assignors);
+                assignors,
+                groupId);
     }
 
     private static class FetchInfo {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index 938fe948b1f..61606ab8e89 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -84,6 +84,7 @@ public void teardown() throws Exception {
     public void testConsumerWithInvalidCredentials() {
         Map<String, Object> props = new HashMap<>(saslClientConfigs);
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + 
server.port());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "");
         StringDeserializer deserializer = new StringDeserializer();
 
         try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(props, deserializer, deserializer)) {
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 0e2797a5de1..5a200055742 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -113,10 +113,12 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
 
   def createConsumer[K, V](keyDeserializer: Deserializer[K] = new 
ByteArrayDeserializer,
                            valueDeserializer: Deserializer[V] = new 
ByteArrayDeserializer,
-                           configOverrides: Properties = new Properties): 
KafkaConsumer[K, V] = {
+                           configOverrides: Properties = new Properties,
+                           configsToRemove: List[String] = List()): 
KafkaConsumer[K, V] = {
     val props = new Properties
     props ++= consumerConfig
     props ++= configOverrides
+    configsToRemove.foreach(props.remove(_))
     val consumer = new KafkaConsumer[K, V](props, keyDeserializer, 
valueDeserializer)
     consumers += consumer
     consumer
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index c06a796040e..c11fc12d16d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -14,6 +14,7 @@ package kafka.api
 
 import java.time.Duration
 import java.util
+import java.util.Arrays.asList
 import java.util.regex.Pattern
 import java.util.{Collections, Locale, Optional, Properties}
 
@@ -23,7 +24,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.{MetricName, TopicPartition}
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.{InvalidGroupIdException, 
InvalidTopicException}
 import org.apache.kafka.common.header.Headers
 import org.apache.kafka.common.record.{CompressionType, TimestampType}
 import org.apache.kafka.common.serialization._
@@ -1814,4 +1815,129 @@ class PlaintextConsumerTest extends BaseConsumerTest {
         s"The current assignment is ${consumer.assignment()}")
   }
 
+  @Test
+  def testConsumingWithNullGroupId(): Unit = {
+    val topic = "test_topic"
+    val partition = 0;
+    val tp = new TopicPartition(topic, partition)
+    createTopic(topic, 1, 1)
+
+    TestUtils.waitUntilTrue(() => {
+      this.zkClient.topicExists(topic)
+    }, "Failed to create topic")
+
+    val producer = createProducer()
+    producer.send(new ProducerRecord(topic, partition, "k1".getBytes, 
"v1".getBytes)).get()
+    producer.send(new ProducerRecord(topic, partition, "k2".getBytes, 
"v2".getBytes)).get()
+    producer.send(new ProducerRecord(topic, partition, "k3".getBytes, 
"v3".getBytes)).get()
+    producer.close()
+
+    // consumer 1 uses the default group id and consumes from earliest offset
+    val consumer1Config = new Properties(consumerConfig)
+    consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
+    val consumer1 = createConsumer(
+      configOverrides = consumer1Config,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+
+    // consumer 2 uses the default group id and consumes from latest offset
+    val consumer2Config = new Properties(consumerConfig)
+    consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+    consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2")
+    val consumer2 = createConsumer(
+      configOverrides = consumer2Config,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+
+    // consumer 3 uses the default group id and starts from an explicit offset
+    val consumer3Config = new Properties(consumerConfig)
+    consumer3Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer3")
+    val consumer3 = createConsumer(
+      configOverrides = consumer3Config,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+
+    consumer1.assign(asList(tp))
+    consumer2.assign(asList(tp))
+    consumer3.assign(asList(tp))
+    consumer3.seek(tp, 1)
+
+    val numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count()
+
+    try {
+      consumer1.commitSync()
+      fail("Expected offset commit to fail due to null group id")
+    } catch {
+      case e: InvalidGroupIdException => // OK
+    }
+
+    try {
+      consumer2.committed(tp)
+      fail("Expected committed offset fetch to fail due to null group id")
+    } catch {
+      case e: InvalidGroupIdException => // OK
+    }
+
+    val numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count()
+    val numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count()
+
+    consumer1.unsubscribe()
+    consumer2.unsubscribe()
+    consumer3.unsubscribe()
+
+    consumer1.close()
+    consumer2.close()
+    consumer3.close()
+
+    assertEquals("Expected consumer1 to consume from earliest offset", 3, 
numRecords1)
+    assertEquals("Expected consumer2 to consume from latest offset", 0, 
numRecords2)
+    assertEquals("Expected consumer3 to consume from offset 1", 2, numRecords3)
+  }
+
+  @Test
+  def testConsumingWithEmptyGroupId(): Unit = {
+    val topic = "test_topic"
+    val partition = 0;
+    val tp = new TopicPartition(topic, partition)
+    createTopic(topic, 1, 1)
+
+    TestUtils.waitUntilTrue(() => {
+      this.zkClient.topicExists(topic)
+    }, "Failed to create topic")
+
+    val producer = createProducer()
+    producer.send(new ProducerRecord(topic, partition, "k1".getBytes, 
"v1".getBytes)).get()
+    producer.send(new ProducerRecord(topic, partition, "k2".getBytes, 
"v2".getBytes)).get()
+    producer.close()
+
+    // consumer 1 uses the empty group id
+    val consumer1Config = new Properties(consumerConfig)
+    consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
+    consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
+    consumer1Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
+    val consumer1 = createConsumer(configOverrides = consumer1Config)
+
+    // consumer 2 uses the empty group id and consumes from latest offset if 
there is no committed offset
+    val consumer2Config = new Properties(consumerConfig)
+    consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+    consumer2Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
+    consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2")
+    consumer2Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
+    val consumer2 = createConsumer(configOverrides = consumer2Config)
+
+    consumer1.assign(asList(tp))
+    consumer2.assign(asList(tp))
+
+    val records1 = consumer1.poll(Duration.ofMillis(5000))
+    consumer1.commitSync()
+
+    val records2 = consumer2.poll(Duration.ofMillis(5000))
+    consumer2.commitSync()
+
+    consumer1.close()
+    consumer2.close()
+
+    assertTrue("Expected consumer1 to consume one message from offset 0",
+      records1.count() == 1 && records1.records(tp).asScala.head.offset == 0)
+    assertTrue("Expected consumer2 to consume one message from offset 1, which 
is the committed offset of consumer1",
+      records2.count() == 1 && records2.records(tp).asScala.head.offset == 1)
+  }
 }
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 33d9964113a..154547b10a0 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,15 @@
 
 <script id="upgrade-template" type="text/x-handlebars-template">
 
+<h4><a id="upgrade_2_2_0" href="#upgrade_2_2_0">Upgrading from 0.8.x, 0.9.x, 
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x to 
2.2.0</a></h4>
+<h5><a id="upgrade_220_notable" href="#upgrade_220_notable">Notable changes in 
2.2.0</a></h5>
+<ul>
+    <li>The default consumer group id has been changed from the empty string 
(<code>""</code>) to <code>null</code>. Consumers who use the new default group 
id will not be able to subscribe to topics,
+        and fetch or commit offsets. The empty string as consumer group id is 
deprecated but will be supported until a future major release. Old clients that 
rely on the empty string group id will now
+        have to explicitly provide it as part of their consumer config. For 
more information see
+        <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer";>KIP-289</a>.</li>
+</ul>
+
 <h4><a id="upgrade_2_1_0" href="#upgrade_2_1_0">Upgrading from 0.8.x, 0.9.x, 
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 2.0.0 to 2.1.0</a></h4>
 
 <p><b>Note that 2.1.x contains a change to the internal schema used to store 
consumer offsets. Once the upgrade is


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve default groupId behavior in consumer
> --------------------------------------------
>
>                 Key: KAFKA-6774
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6774
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: Jason Gustafson
>            Assignee: Vahid Hashemian
>            Priority: Major
>              Labels: needs-kip
>             Fix For: 2.2.0
>
>
> At the moment, the default groupId in the consumer is "". If you try to use 
> this to subscribe() to a topic, the broker will reject the group as invalid. 
> On the other hand, if you use it with assign(), then the user will be able to 
> fetch and commit offsets using the empty groupId. Probably 99% of the time, 
> this is not what the user expects. Instead you would probably expect that if 
> no groupId is provided, then no committed offsets will be fetched at all and 
> we'll just use the auto reset behavior if we don't have a current position.
> Here are two potential solutions (both requiring a KIP):
> 1. Change the default to null. We will preserve the current behavior for 
> subscribe(). When using assign(), we will not bother fetching committed 
> offsets for the null groupId, and any attempt to commit offsets will raise an 
> error. The user can still use the empty groupId, but they have to specify it 
> explicitly.
> 2. Keep the current default, but change the consumer to treat this value as 
> if it were null as described in option 1. The argument for this behavior is 
> that using the empty groupId to commit offsets is inherently a dangerous 
> practice and should not be permitted. We'd have to convince ourselves that 
> we're fine not needing to allow the empty groupId for backwards compatibility 
> though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to