kirktrue commented on code in PR #17411:
URL: https://github.com/apache/kafka/pull/17411#discussion_r1800266192


##########
checkstyle/import-control-core.xml:
##########
@@ -142,4 +142,8 @@
     <allow pkg="org.apache.directory" />
     <allow pkg="org.apache.mina.core.service" />
   </subpackage>
+
+  <subpackage name="clients">
+    <allow pkg="org.apache.kafka.clients" />
+  </subpackage>

Review Comment:
   Is this necessary? We have other, existing integration tests that reference 
the `clients` sub-package and don't complain. Is this a new requirement?



##########
core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.clients.consumer;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestExtensions;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+@ExtendWith(ClusterTestExtensions.class)
+public class AsyncKafkaConsumerIntegrationTest {
+
+    @ClusterTest(serverProperties = {
+        @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+        @ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+        @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic")

Review Comment:
   Is it possible to use the enum here? I know annotations usually require 
compile-time evaluation, so maybe it's not possible.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -378,11 +378,17 @@ private void onErrorResponse(final R response, final long 
currentTimeMs) {
             case INVALID_REQUEST:
             case GROUP_MAX_SIZE_REACHED:
             case UNSUPPORTED_ASSIGNOR:
-            case UNSUPPORTED_VERSION:
                 logger.error("{} failed due to {}: {}", 
heartbeatRequestName(), error, errorMessage);
                 handleFatalFailure(error.exception(errorMessage));
                 break;
 
+            case UNSUPPORTED_VERSION:
+                message = "The cluster doesn't yet support the new consumer 
group protocol." +
+                        " Set group.protocol=classic to revert to the classic 
protocol until the cluster is upgraded.";

Review Comment:
   Can we use the constants from `GroupProtocol` instead of hard-coding the 
strings here? Just helps when code spelunking.



##########
core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.clients.consumer;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestExtensions;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+@ExtendWith(ClusterTestExtensions.class)
+public class AsyncKafkaConsumerIntegrationTest {
+
+    @ClusterTest(serverProperties = {
+        @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+        @ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+        @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic")
+    })
+    public void 
testAsyncConsumerWithoutConsumerRebalanceProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        checkUnsupportedConsumerGroupHeartbeat(clusterInstance);
+    }
+
+    @ClusterTest(serverProperties = {
+        @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+        @ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+        @ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false")
+    })
+    public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance 
clusterInstance) throws Exception {
+        checkUnsupportedConsumerGroupHeartbeat(clusterInstance);
+    }
+
+    private void createTopic(ClusterInstance clusterInstance, String topic) {
+        try (Admin admin = 
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
 clusterInstance.bootstrapServers()))) {
+            assertDoesNotThrow(() -> 
admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 
1))).topicId(topic).get());
+        }
+
+        assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, 1));
+    }

Review Comment:
   Nit-picky, but does `createTopic()` need to be separate from 
`checkUnsupportedConsumerGroupHeartbeat()`, or could it be inline?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -378,11 +378,17 @@ private void onErrorResponse(final R response, final long 
currentTimeMs) {
             case INVALID_REQUEST:
             case GROUP_MAX_SIZE_REACHED:
             case UNSUPPORTED_ASSIGNOR:
-            case UNSUPPORTED_VERSION:
                 logger.error("{} failed due to {}: {}", 
heartbeatRequestName(), error, errorMessage);
                 handleFatalFailure(error.exception(errorMessage));
                 break;
 
+            case UNSUPPORTED_VERSION:
+                message = "The cluster doesn't yet support the new consumer 
group protocol." +
+                        " Set group.protocol=classic to revert to the classic 
protocol until the cluster is upgraded.";
+                logger.error("{} failed due to {}: {}", 
heartbeatRequestName(), error, message);

Review Comment:
   Does the `handleFatalFailure()` method bubble this up to the user? If so, do 
we need to log the error here too?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -378,11 +378,17 @@ private void onErrorResponse(final R response, final long 
currentTimeMs) {
             case INVALID_REQUEST:
             case GROUP_MAX_SIZE_REACHED:
             case UNSUPPORTED_ASSIGNOR:
-            case UNSUPPORTED_VERSION:
                 logger.error("{} failed due to {}: {}", 
heartbeatRequestName(), error, errorMessage);
                 handleFatalFailure(error.exception(errorMessage));
                 break;
 
+            case UNSUPPORTED_VERSION:
+                message = "The cluster doesn't yet support the new consumer 
group protocol." +
+                        " Set group.protocol=classic to revert to the classic 
protocol until the cluster is upgraded.";

Review Comment:
   If I'm not mistaken, there are multiple cases that could cause this error:
   
   1. The cluster is < 4.0.0 and doesn't support the new `CONSUMER` group 
protocol
   2. The cluster is >= 4.0.0 but isn't configured to use the new coordinator
   3. The cluster is >= 4.0.0 and is configured to use the new coordinator but 
_isn't_ configured to support the new `CONSUMER` group protocol
   
   The wording in the proposed error message doesn't distinguish the cases, but 
perhaps that's a good thing?



##########
core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.clients.consumer;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestExtensions;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+@ExtendWith(ClusterTestExtensions.class)
+public class AsyncKafkaConsumerIntegrationTest {
+
+    @ClusterTest(serverProperties = {
+        @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+        @ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+        @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic")
+    })
+    public void 
testAsyncConsumerWithoutConsumerRebalanceProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        checkUnsupportedConsumerGroupHeartbeat(clusterInstance);
+    }
+
+    @ClusterTest(serverProperties = {
+        @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+        @ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+        @ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false")
+    })
+    public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance 
clusterInstance) throws Exception {
+        checkUnsupportedConsumerGroupHeartbeat(clusterInstance);
+    }
+
+    private void createTopic(ClusterInstance clusterInstance, String topic) {
+        try (Admin admin = 
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
 clusterInstance.bootstrapServers()))) {
+            assertDoesNotThrow(() -> 
admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 
1))).topicId(topic).get());
+        }
+
+        assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, 1));
+    }
+
+    private KafkaConsumer<String, String> 
createAsyncKafkaConsumer(ClusterInstance clusterInstance, String groupId) {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers());
+        configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+        return new KafkaConsumer<>(configs);

Review Comment:
   There are utility methods in other integration tests the eliminate the need 
for the boilerplate of `createAsyncKafkaConsumer()`. Is it possible we could 
leverage those?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to