FrankYang0529 commented on code in PR #15908:
URL: https://github.com/apache/kafka/pull/15908#discussion_r1639415070


##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##########
@@ -16,834 +16,986 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.ConsumerGroupState;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.tools.ToolsTestUtils;
 
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.test.TestUtils.RANDOM;
-import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
-public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
+@Tag("integration")
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DescribeConsumerGroupTest {
+    private static final String TOPIC_PREFIX = "test.topic.";
+    private static final String GROUP_PREFIX = "test.group.";
     private static final List<List<String>> DESCRIBE_TYPE_OFFSETS = 
Arrays.asList(Collections.singletonList(""), 
Collections.singletonList("--offsets"));
     private static final List<List<String>> DESCRIBE_TYPE_MEMBERS = 
Arrays.asList(Collections.singletonList("--members"), 
Arrays.asList("--members", "--verbose"));
     private static final List<List<String>> DESCRIBE_TYPE_STATE = 
Collections.singletonList(Collections.singletonList("--state"));
-    private static final List<List<String>> DESCRIBE_TYPES;
-
-    static {
-        List<List<String>> describeTypes = new ArrayList<>();
-
-        describeTypes.addAll(DESCRIBE_TYPE_OFFSETS);
-        describeTypes.addAll(DESCRIBE_TYPE_MEMBERS);
-        describeTypes.addAll(DESCRIBE_TYPE_STATE);
+    private static final List<List<String>> DESCRIBE_TYPES = 
Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS, 
DESCRIBE_TYPE_STATE).flatMap(Collection::stream).collect(Collectors.toList());
+    private ClusterInstance clusterInstance;
 
-        DESCRIBE_TYPES = describeTypes;
+    private static List<ClusterConfig> generator() {
+        return ConsumerGroupCommandTestUtils.generator();
     }
 
-    @ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
-    public void testDescribeNonExistingGroup(String quorum, String 
groupProtocol) {
-        createOffsetsTopic(listenerName(), new Properties());
+    @ClusterTemplate("generator")
+    public void testDescribeNonExistingGroup(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
         String missingGroup = "missing.group";
 
         for (List<String> describeType : DESCRIBE_TYPES) {
             // note the group to be queried is a different (non-existing) group
-            List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", missingGroup));
+            List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup));
             cgcArgs.addAll(describeType);
-            ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs.toArray(new String[0]));
-
-            String output = 
ToolsTestUtils.grabConsoleOutput(describeGroups(service));
-            assertTrue(output.contains("Consumer group '" + missingGroup + "' 
does not exist."),
-                "Expected error was not detected for describe option '" + 
String.join(" ", describeType) + "'");
+            try (ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(cgcArgs.toArray(new String[0]));) {
+                String output = 
ToolsTestUtils.grabConsoleOutput(describeGroups(service));
+                assertTrue(output.contains("Consumer group '" + missingGroup + 
"' does not exist."),
+                        "Expected error was not detected for describe option 
'" + String.join(" ", describeType) + "'");
+            }
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDescribeWithMultipleSubActions(String quorum) {
-        AtomicInteger exitStatus = new AtomicInteger(0);
-        AtomicReference<String> exitMessage = new AtomicReference<>("");
-        Exit.setExitProcedure((status, err) -> {
-            exitStatus.set(status);
-            exitMessage.set(err);
-            throw new RuntimeException();
-        });
-        String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--members", 
"--state"};
-        try {
-            assertThrows(RuntimeException.class, () -> 
ConsumerGroupCommand.main(cgcArgs));
-        } finally {
-            Exit.resetExitProcedure();
+    @ClusterTemplate("generator")
+    public void testDescribeOffsetsOfNonExistingGroup(ClusterInstance 
clusterInstance) throws Exception {
+        this.clusterInstance = clusterInstance;
+        String missingGroup = "missing.group";
+        for (GroupProtocol groupProtocol: 
clusterInstance.supportedGroupProtocols()) {
+            String topic = TOPIC_PREFIX + groupProtocol.name();
+            String group = GROUP_PREFIX + groupProtocol.name();
+            createTopic(topic);
+
+            // run one consumer in the group consuming from a single-partition 
topic
+            try (AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
+                 // note the group to be queried is a different (non-existing) 
group
+                 ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup});
+            ) {
+                Entry<Optional<ConsumerGroupState>, 
Optional<Collection<PartitionAssignmentState>>> res = 
service.collectGroupOffsets(missingGroup);

Review Comment:
   Hi @chia7712, The `res` value is used by next line for assertion. It's not 
unused variable. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to