Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-06-04 Thread via GitHub


chia7712 commented on PR #15821:
URL: https://github.com/apache/kafka/pull/15821#issuecomment-2146925116

   @FrankYang0529 Could you please merge `ListConsumerGroupUnitTest` into 
`ListConsumerGroupTest`? Multi classes in single class file is not common in 
code base


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-30 Thread via GitHub


chia7712 merged PR #15821:
URL: https://github.com/apache/kafka/pull/15821


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-29 Thread via GitHub


FrankYang0529 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1619707209


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -50,14 +51,13 @@ class ConsumerGroupCommandTestUtils {
 private ConsumerGroupCommandTestUtils() {
 }
 
-static List generator() {
+static List generator(boolean onlyConsumerGroupCoordinator) 
{

Review Comment:
   Updated it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-29 Thread via GitHub


chia7712 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1619254777


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -50,14 +51,13 @@ class ConsumerGroupCommandTestUtils {
 private ConsumerGroupCommandTestUtils() {
 }
 
-static List generator() {
+static List generator(boolean onlyConsumerGroupCoordinator) 
{

Review Comment:
   Could we avoid using single `boolean` argument? That is not readable. We can 
do a bit refactor:
   ```java
   static List generator() {
   return Stream.concat(forConsumerGroupCoordinator().stream(), 
forClassicGroupCoordinator().stream())
   .collect(Collectors.toList());
   }
   
   static List forConsumerGroupCoordinator() {
   Map serverProperties = new HashMap<>();
   serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
   serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
   serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");
   
   return Collections.singletonList(ClusterConfig.defaultBuilder()
   .setTypes(Stream.of(KRAFT, 
CO_KRAFT).collect(Collectors.toSet()))
   .setServerProperties(serverProperties)
   
.setTags(Collections.singletonList("consumerGroupCoordinator"))
   .build());
   }
   
   static List forClassicGroupCoordinator() {
   Map serverProperties = new HashMap<>();
   serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
   serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
   serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false");
   
   return Collections.singletonList(ClusterConfig.defaultBuilder()
   .setServerProperties(serverProperties)
   
.setTags(Collections.singletonList("classicGroupCoordinator"))
   .build());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-28 Thread via GitHub


FrankYang0529 commented on PR #15821:
URL: https://github.com/apache/kafka/pull/15821#issuecomment-2135260400

   The CI is finished and failed test cases are not related to this PR.
   
   MirrorConnectorsIntegrationSSLTest.testReplicateSourceDefault: 
https://issues.apache.org/jira/browse/KAFKA-15927
   ReplicationQuotasTest.shouldThrottleOldSegments: 
https://issues.apache.org/jira/browse/KAFKA-16635
   ConsumerBounceTest.testConsumptionWithBrokerFailures: 
https://issues.apache.org/jira/browse/KAFKA-15146
   DelegationTokenEndToEndAuthorizationWithOwnerTest: 
https://issues.apache.org/jira/browse/KAFKA-15411
   QuorumControllerMetricsIntegrationTest.testTimeoutMetrics: 
https://issues.apache.org/jira/browse/KAFKA-16173


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-27 Thread via GitHub


FrankYang0529 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1616179624


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -17,449 +17,502 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Stream;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
-public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
-public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
-String simpleGroup = "simple-group";
-
-createOffsetsTopic(listenerName(), new Properties());
-
-addSimpleGroupExecutor(simpleGroup);
-addConsumerGroupExecutor(1);
-addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+@Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+public class ListConsumerGroupTest {
+private final static String TOPIC = "foo";
+private final static String SIMPLE_GROUP = "test.simple.group";
+private final static String DEFAULT_GROUP = "test.default.group";
+private final static String PROTOCOL_GROUP = "test.protocol.group";
+private final ClusterInstance clusterInstance;
+
+ListConsumerGroupTest(ClusterInstance clusterInstance) {
+this.clusterInstance = clusterInstance;
+}
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+private static List defaultGenerator() {
+return ConsumerGroupCommandTestUtils.generator();
+}
 
-Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, 
PROTOCOL_GROUP));
-final AtomicReference foundGroups = new AtomicReference<>();
+private static List consumerProtocolOnlyGenerator() {

Review Comment:
   Hi @chia7712, thanks for the suggestion. I found that I didn't follow old 
cases to test `group.coordinator.new.enable=true` with classic and consumer 
group protocols. I have updated the PR to follow it.
   
   BTW, do we want to follow this comment to reduce test cases like old 
framework? Thanks.
   
   

Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-27 Thread via GitHub


chia7712 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1616213225


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -17,449 +17,502 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Stream;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
-public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
-public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
-String simpleGroup = "simple-group";
-
-createOffsetsTopic(listenerName(), new Properties());
-
-addSimpleGroupExecutor(simpleGroup);
-addConsumerGroupExecutor(1);
-addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+@Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+public class ListConsumerGroupTest {
+private final static String TOPIC = "foo";
+private final static String SIMPLE_GROUP = "test.simple.group";
+private final static String DEFAULT_GROUP = "test.default.group";
+private final static String PROTOCOL_GROUP = "test.protocol.group";
+private final ClusterInstance clusterInstance;
+
+ListConsumerGroupTest(ClusterInstance clusterInstance) {
+this.clusterInstance = clusterInstance;
+}
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+private static List defaultGenerator() {
+return ConsumerGroupCommandTestUtils.generator();
+}
 
-Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, 
PROTOCOL_GROUP));
-final AtomicReference foundGroups = new AtomicReference<>();
+private static List consumerProtocolOnlyGenerator() {

Review Comment:
   > BTW, do we want to follow this comment to reduce test cases like old 
framework? Thanks.
   
   That is a good suggestion. Could you please file a ticket for it? We should 
have a separate PR for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go 

Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-27 Thread via GitHub


FrankYang0529 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1616179624


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -17,449 +17,502 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Stream;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
-public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
-public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
-String simpleGroup = "simple-group";
-
-createOffsetsTopic(listenerName(), new Properties());
-
-addSimpleGroupExecutor(simpleGroup);
-addConsumerGroupExecutor(1);
-addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+@Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+public class ListConsumerGroupTest {
+private final static String TOPIC = "foo";
+private final static String SIMPLE_GROUP = "test.simple.group";
+private final static String DEFAULT_GROUP = "test.default.group";
+private final static String PROTOCOL_GROUP = "test.protocol.group";
+private final ClusterInstance clusterInstance;
+
+ListConsumerGroupTest(ClusterInstance clusterInstance) {
+this.clusterInstance = clusterInstance;
+}
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+private static List defaultGenerator() {
+return ConsumerGroupCommandTestUtils.generator();
+}
 
-Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, 
PROTOCOL_GROUP));
-final AtomicReference foundGroups = new AtomicReference<>();
+private static List consumerProtocolOnlyGenerator() {

Review Comment:
   Hi @chia7712, thanks for the suggestion. I found that I didn't follow old 
cases to test `group. coordinator. new. enable=true` with classic and consumer 
group protocols. I have updated the PR to follow it.
   
   BTW, do we want to follow this comment to reduce test cases like old 
framework? Thanks.
   
   

Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-27 Thread via GitHub


chia7712 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1615797305


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -17,449 +17,502 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Stream;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
-public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
-public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
-String simpleGroup = "simple-group";
-
-createOffsetsTopic(listenerName(), new Properties());
-
-addSimpleGroupExecutor(simpleGroup);
-addConsumerGroupExecutor(1);
-addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+@Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+public class ListConsumerGroupTest {
+private final static String TOPIC = "foo";
+private final static String SIMPLE_GROUP = "test.simple.group";
+private final static String DEFAULT_GROUP = "test.default.group";
+private final static String PROTOCOL_GROUP = "test.protocol.group";
+private final ClusterInstance clusterInstance;
+
+ListConsumerGroupTest(ClusterInstance clusterInstance) {
+this.clusterInstance = clusterInstance;
+}
 
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--list"};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+private static List defaultGenerator() {
+return ConsumerGroupCommandTestUtils.generator();
+}
 
-Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, 
PROTOCOL_GROUP));
-final AtomicReference foundGroups = new AtomicReference<>();
+private static List consumerProtocolOnlyGenerator() {

Review Comment:
   How about moving this helper to `ConsumerGroupCommandTestUtils` and reuse it 
to rewrite `ConsumerGroupCommandTestUtils.generator`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-24 Thread via GitHub


chia7712 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1612965039


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -68,15 +68,31 @@ static List generator() {
 ClusterConfig consumerGroupCoordinator = ClusterConfig.defaultBuilder()
 .setTypes(Stream.of(KRAFT, 
CO_KRAFT).collect(Collectors.toSet()))
 .setServerProperties(serverProperties)
-.setTags(Collections.singletonList("newGroupCoordinator"))
+.setTags(Collections.singletonList("consumerGroupCoordinator"))
 .build();
 return Arrays.asList(classicGroupCoordinator, 
consumerGroupCoordinator);
 }
 
+static  AutoCloseable buildConsumers(int numberOfConsumers,
+boolean syncCommit,

Review Comment:
   In this case the consumers don't use consumer offsets, so `syncCommit` must 
be "false" to avoid error



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##
@@ -17,449 +17,503 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Stream;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
+import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
-public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
-@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-@MethodSource("getTestQuorumAndGroupProtocolParametersAll")
-public void testListConsumerGroupsWithoutFilters(String quorum, String 
groupProtocol) throws Exception {
-String simpleGroup = "simple-group";
-
-createOffsetsTopic(listenerName(), new Properties());
-
-addSimpleGroupExecutor(simpleGroup);
-addConsumerGroupExecutor(1);
-addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+@Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+public class ListConsumerGroupTest {
+private final static String TOPIC = "foo";
+private final static String SIMPLE_GROUP = "test.simple.group";
+private final static String DEFAULT_GROUP = "test.default.group";
+private final static String PROTOCOL_GROUP = "test.protocol.group";
+private final ClusterInstance clusterInstance;
+
+ListConsumerGroupTest(ClusterInstance clusterInstance) {
+this.clusterInstance = 

Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-15 Thread via GitHub


FrankYang0529 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1601924534


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -93,6 +95,27 @@ static  AutoCloseable buildConsumers(int 
numberOfConsumers,
 }
 }
 
+static  AutoCloseable buildConsumers(int numberOfConsumers,

Review Comment:
   Thanks for the great suggestion. Updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1600210615


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -93,6 +95,27 @@ static  AutoCloseable buildConsumers(int 
numberOfConsumers,
 }
 }
 
+static  AutoCloseable buildConsumers(int numberOfConsumers,

Review Comment:
   Please DON'T generate a bunch of duplicate code...
   ```java
   static  AutoCloseable buildConsumers(int numberOfConsumers,
   boolean syncCommit,
   Set partitions,
   Supplier> 
consumerSupplier) {
   return buildConsumers(numberOfConsumers, syncCommit, 
consumerSupplier,
   consumer -> consumer.assign(partitions));
   }
   
   static  AutoCloseable buildConsumers(int numberOfConsumers,
   boolean syncCommit,
   String topic,
   Supplier> 
consumerSupplier) {
   return buildConsumers(numberOfConsumers, syncCommit, 
consumerSupplier,
   consumer -> consumer.subscribe(Collections.singleton(topic)));
   }
   
   private static  AutoCloseable buildConsumers(int numberOfConsumers,
   boolean syncCommit,
   
Supplier> consumerSupplier,
   
Consumer> setPartitions) {
   List> consumers = new 
ArrayList<>(numberOfConsumers);
   ExecutorService executor = 
Executors.newFixedThreadPool(numberOfConsumers);
   AtomicBoolean closed = new AtomicBoolean(false);
   final AutoCloseable closeable = () -> releaseConsumers(closed, 
consumers, executor);
   try {
   for (int i = 0; i < numberOfConsumers; i++) {
   KafkaConsumer consumer = consumerSupplier.get();
   consumers.add(consumer);
   executor.execute(() -> initConsumer(syncCommit, () -> {
   setPartitions.accept(consumer);
   return consumer;
   }, closed));
   }
   return closeable;
   } catch (Throwable e) {
   Utils.closeQuietly(closeable, "Release Consumer");
   throw e;
   }
   }
   
   private static  void releaseConsumers(AtomicBoolean closed, 
List> consumers, ExecutorService executor) throws 
InterruptedException {
   closed.set(true);
   consumers.forEach(KafkaConsumer::wakeup);
   executor.shutdown();
   executor.awaitTermination(1, TimeUnit.MINUTES);
   }
   
   private static  void initConsumer(boolean syncCommit,
Supplier> 
consumerSupplier,
AtomicBoolean closed) {
   try (KafkaConsumer kafkaConsumer = consumerSupplier.get()) {
   while (!closed.get()) {
   kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
   if (syncCommit)
   kafkaConsumer.commitSync();
   }
   } catch (WakeupException e) {
   // OK
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-13 Thread via GitHub


chia7712 commented on PR #15821:
URL: https://github.com/apache/kafka/pull/15821#issuecomment-2107364174

   @FrankYang0529 please rebase code to include 
https://github.com/apache/kafka/commit/334d5d58bb73ca04d04be90dec8c4e49000577ec


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-11 Thread via GitHub


FrankYang0529 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1597470470


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -92,6 +94,7 @@ static void generator(ClusterGenerator clusterGenerator) {
 static  AutoCloseable buildConsumers(int numberOfConsumers,

Review Comment:
   Thank you. I update to define two `buildConsumers`. May you help me take a 
look? If we're good with current design. I will update to 
https://github.com/apache/kafka/pull/15908.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on code in PR #15821:
URL: https://github.com/apache/kafka/pull/15821#discussion_r1596196298


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -92,6 +94,7 @@ static void generator(ClusterGenerator clusterGenerator) {
 static  AutoCloseable buildConsumers(int numberOfConsumers,

Review Comment:
   Could we have two `buildConsumers` to deal with "assign"/"subscribe" 
individually?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-09 Thread via GitHub


FrankYang0529 commented on PR #15821:
URL: https://github.com/apache/kafka/pull/15821#issuecomment-2102879454

   > > This is used by ListConsumerGroupTest and DescribeConsumerGroupTest. I 
think we can create a new class SimpleConsumerGroupExecutorTestUtils for it. 
WDYT? Thank you.
   > 
   > That is addressed already. see 
https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L92
   
   Thanks. The `SimpleConsumerGroupExecutor ` subscribe topic partitions. I 
have updated `ConsumerGroupCommandTestUtils` to support it.
   
   
https://github.com/apache/kafka/blob/f4fdaa702a2e718bdb44b9c5fec254f32a33f0d8/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java#L274-L287


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on PR #15821:
URL: https://github.com/apache/kafka/pull/15821#issuecomment-2099700974

   > This is used by ListConsumerGroupTest and DescribeConsumerGroupTest. I 
think we can create a new class SimpleConsumerGroupExecutorTestUtils for it. 
WDYT? Thank you.
   
   That is addressed already. see 
   
https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L92


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-07 Thread via GitHub


FrankYang0529 commented on PR #15821:
URL: https://github.com/apache/kafka/pull/15821#issuecomment-2098714976

   Hi @chia7712, I rebase latest trunk branch, so we have 
`ConsumerGroupCommandTestUtils` now. The only remaining part is 
`SimpleConsumerGroupExecutor`. This is used by `ListConsumerGroupTest` and 
`DescribeConsumerGroupTest`. I think we can create a new class 
`SimpleConsumerGroupExecutorTestUtils` for it. WDYT? Thank you.
   
   
https://github.com/apache/kafka/blob/21bf715622e9d05984fa8a2a1f9f12d54b76ce41/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java#L327-L335


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-04-26 Thread via GitHub


FrankYang0529 commented on PR #15821:
URL: https://github.com/apache/kafka/pull/15821#issuecomment-2080374723

   Rely on https://github.com/apache/kafka/pull/15766.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-04-26 Thread via GitHub


FrankYang0529 opened a new pull request, #15821:
URL: https://github.com/apache/kafka/pull/15821

   By using `ClusterTestExtensions`, `ListConsumerGroupTest` get get away from 
`KafkaServerTestHarness` dependency.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org