kirktrue commented on code in PR #17411: URL: https://github.com/apache/kafka/pull/17411#discussion_r1800266192
########## checkstyle/import-control-core.xml: ########## @@ -142,4 +142,8 @@ <allow pkg="org.apache.directory" /> <allow pkg="org.apache.mina.core.service" /> </subpackage> + + <subpackage name="clients"> + <allow pkg="org.apache.kafka.clients" /> + </subpackage> Review Comment: Is this necessary? We have other, existing integration tests that reference the `clients` sub-package and don't complain. Is this a new requirement? ########## core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.clients.consumer; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestExtensions; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +@ExtendWith(ClusterTestExtensions.class) +public class AsyncKafkaConsumerIntegrationTest { + + @ClusterTest(serverProperties = { + @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic") Review Comment: Is it possible to use the enum here? I know annotations usually require compile-time evaluation, so maybe it's not possible. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java: ########## @@ -378,11 +378,17 @@ private void onErrorResponse(final R response, final long currentTimeMs) { case INVALID_REQUEST: case GROUP_MAX_SIZE_REACHED: case UNSUPPORTED_ASSIGNOR: - case UNSUPPORTED_VERSION: logger.error("{} failed due to {}: {}", heartbeatRequestName(), error, errorMessage); handleFatalFailure(error.exception(errorMessage)); break; + case UNSUPPORTED_VERSION: + message = "The cluster doesn't yet support the new consumer group protocol." + + " Set group.protocol=classic to revert to the classic protocol until the cluster is upgraded."; Review Comment: Can we use the constants from `GroupProtocol` instead of hard-coding the strings here? Just helps when code spelunking. ########## core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.clients.consumer; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestExtensions; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +@ExtendWith(ClusterTestExtensions.class) +public class AsyncKafkaConsumerIntegrationTest { + + @ClusterTest(serverProperties = { + @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic") + }) + public void testAsyncConsumerWithoutConsumerRebalanceProtocol(ClusterInstance clusterInstance) throws Exception { + checkUnsupportedConsumerGroupHeartbeat(clusterInstance); + } + + @ClusterTest(serverProperties = { + @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + @ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false") + }) + public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance clusterInstance) throws Exception { + checkUnsupportedConsumerGroupHeartbeat(clusterInstance); + } + + private void createTopic(ClusterInstance clusterInstance, String topic) { + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + assertDoesNotThrow(() -> admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))).topicId(topic).get()); + } + + assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, 1)); + } Review Comment: Nit-picky, but does `createTopic()` need to be separate from `checkUnsupportedConsumerGroupHeartbeat()`, or could it be inline? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java: ########## @@ -378,11 +378,17 @@ private void onErrorResponse(final R response, final long currentTimeMs) { case INVALID_REQUEST: case GROUP_MAX_SIZE_REACHED: case UNSUPPORTED_ASSIGNOR: - case UNSUPPORTED_VERSION: logger.error("{} failed due to {}: {}", heartbeatRequestName(), error, errorMessage); handleFatalFailure(error.exception(errorMessage)); break; + case UNSUPPORTED_VERSION: + message = "The cluster doesn't yet support the new consumer group protocol." + + " Set group.protocol=classic to revert to the classic protocol until the cluster is upgraded."; + logger.error("{} failed due to {}: {}", heartbeatRequestName(), error, message); Review Comment: Does the `handleFatalFailure()` method bubble this up to the user? If so, do we need to log the error here too? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java: ########## @@ -378,11 +378,17 @@ private void onErrorResponse(final R response, final long currentTimeMs) { case INVALID_REQUEST: case GROUP_MAX_SIZE_REACHED: case UNSUPPORTED_ASSIGNOR: - case UNSUPPORTED_VERSION: logger.error("{} failed due to {}: {}", heartbeatRequestName(), error, errorMessage); handleFatalFailure(error.exception(errorMessage)); break; + case UNSUPPORTED_VERSION: + message = "The cluster doesn't yet support the new consumer group protocol." + + " Set group.protocol=classic to revert to the classic protocol until the cluster is upgraded."; Review Comment: If I'm not mistaken, there are multiple cases that could cause this error: 1. The cluster is < 4.0.0 and doesn't support the new `CONSUMER` group protocol 2. The cluster is >= 4.0.0 but isn't configured to use the new coordinator 3. The cluster is >= 4.0.0 and is configured to use the new coordinator but _isn't_ configured to support the new `CONSUMER` group protocol The wording in the proposed error message doesn't distinguish the cases, but perhaps that's a good thing? ########## core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.clients.consumer; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestExtensions; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +@ExtendWith(ClusterTestExtensions.class) +public class AsyncKafkaConsumerIntegrationTest { + + @ClusterTest(serverProperties = { + @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic") + }) + public void testAsyncConsumerWithoutConsumerRebalanceProtocol(ClusterInstance clusterInstance) throws Exception { + checkUnsupportedConsumerGroupHeartbeat(clusterInstance); + } + + @ClusterTest(serverProperties = { + @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + @ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false") + }) + public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance clusterInstance) throws Exception { + checkUnsupportedConsumerGroupHeartbeat(clusterInstance); + } + + private void createTopic(ClusterInstance clusterInstance, String topic) { + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + assertDoesNotThrow(() -> admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))).topicId(topic).get()); + } + + assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, 1)); + } + + private KafkaConsumer<String, String> createAsyncKafkaConsumer(ClusterInstance clusterInstance, String groupId) { + Map<String, Object> configs = new HashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); + return new KafkaConsumer<>(configs); Review Comment: There are utility methods in other integration tests the eliminate the need for the boilerplate of `createAsyncKafkaConsumer()`. Is it possible we could leverage those? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org