TaiJuWu commented on code in PR #16127: URL: https://github.com/apache/kafka/pull/16127#discussion_r1712901628
########## tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java: ########## @@ -85,653 +80,761 @@ import static org.mockito.Mockito.spy; @Tag("integration") -@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters -public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTestHarness implements Logging, RackAwareTest { +@ExtendWith(ClusterTestExtensions.class) +public class TopicCommandIntegrationTest { private final short defaultReplicationFactor = 1; private final int defaultNumPartitions = 1; - private TopicCommand.TopicService topicService; - private Admin adminClient; - private String bootstrapServer; - private String testTopicName; - private Buffer<KafkaBroker> scalaBrokers; - private Seq<ControllerServer> scalaControllers; - /** - * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every - * test and should not reuse previous configurations unless they select their ports randomly when servers are started. - * - * Note the replica fetch max bytes is set to `1` in order to throttle the rate of replication for test - * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`. - */ - @Override - public scala.collection.Seq<KafkaConfig> generateConfigs() { - Map<Integer, String> rackInfo = new HashMap<>(); - rackInfo.put(0, "rack1"); - rackInfo.put(1, "rack2"); - rackInfo.put(2, "rack2"); - rackInfo.put(3, "rack1"); - rackInfo.put(4, "rack3"); - rackInfo.put(5, "rack3"); - - List<Properties> brokerConfigs = ToolsTestUtils - .createBrokerProperties(6, zkConnectOrNull(), rackInfo, defaultNumPartitions, defaultReplicationFactor); - - List<KafkaConfig> configs = new ArrayList<>(); - for (Properties props : brokerConfigs) { - props.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1"); - configs.add(KafkaConfig.fromProps(props)); - } - return JavaConverters.asScalaBuffer(configs).toSeq(); - } + private final ClusterInstance clusterInstance; private TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap(String... opts) { + String bootstrapServer = clusterInstance.bootstrapServers(); String[] finalOptions = Stream.concat(Arrays.stream(opts), Stream.of("--bootstrap-server", bootstrapServer) ).toArray(String[]::new); return new TopicCommand.TopicCommandOptions(finalOptions); } - @BeforeEach - public void setUp(TestInfo info) { - super.setUp(info); - // create adminClient - Properties props = new Properties(); - bootstrapServer = bootstrapServers(listenerName()); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - adminClient = Admin.create(props); - topicService = new TopicCommand.TopicService(props, Optional.of(bootstrapServer)); - testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), org.apache.kafka.test.TestUtils.randomString(10)); - scalaBrokers = brokers(); - scalaControllers = controllerServers(); + static List<ClusterConfig> generate1() { + Map<String, String> serverProp = new HashMap<>(); + serverProp.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1"); // if config name error, no exception throw + + Map<Integer, Map<String, String>> rackInfo = new HashMap<>(); + Map<String, String> infoPerBroker1 = new HashMap<>(); + infoPerBroker1.put("broker.rack", "rack1"); + Map<String, String> infoPerBroker2 = new HashMap<>(); + infoPerBroker2.put("broker.rack", "rack2"); + Map<String, String> infoPerBroker3 = new HashMap<>(); + infoPerBroker3.put("broker.rack", "rack2"); + Map<String, String> infoPerBroker4 = new HashMap<>(); + infoPerBroker4.put("broker.rack", "rack1"); + Map<String, String> infoPerBroker5 = new HashMap<>(); + infoPerBroker5.put("broker.rack", "rack3"); + Map<String, String> infoPerBroker6 = new HashMap<>(); + infoPerBroker6.put("broker.rack", "rack3"); + + rackInfo.put(0, infoPerBroker1); + rackInfo.put(1, infoPerBroker2); + rackInfo.put(2, infoPerBroker3); + rackInfo.put(3, infoPerBroker4); + rackInfo.put(4, infoPerBroker5); + rackInfo.put(5, infoPerBroker6); + + return Collections.singletonList(ClusterConfig.defaultBuilder() + .setBrokers(6) + .setServerProperties(serverProp) + .setPerServerProperties(rackInfo) + .build() + ); } - @AfterEach - public void close() throws Exception { - if (topicService != null) - topicService.close(); - if (adminClient != null) - adminClient.close(); + TopicCommandIntegrationTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreate(String quorum) throws Exception { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, 1, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - assertTrue(adminClient.listTopics().names().get().contains(testTopicName), - "Admin client didn't see the created topic. It saw: " + adminClient.listTopics().names().get()); + @ClusterTemplate("generate1") + public void testCreate(TestInfo testInfo) throws InterruptedException, ExecutionException { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + Assertions.assertTrue(adminClient.listTopics().names().get().contains(testTopicName), + "Admin client didn't see the created topic. It saw: " + adminClient.listTopics().names().get()); + + adminClient.deleteTopics(Collections.singletonList(testTopicName)); + clusterInstance.waitForTopic(testTopicName, 0); + Assertions.assertTrue(adminClient.listTopics().names().get().isEmpty(), + "Admin client see the created topic. It saw: " + adminClient.listTopics().names().get()); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithDefaults(String quorum) throws Exception { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - List<TopicPartitionInfo> partitions = adminClient - .describeTopics(Collections.singletonList(testTopicName)) - .allTopicNames() - .get() - .get(testTopicName) - .partitions(); - assertEquals(defaultNumPartitions, partitions.size(), "Unequal partition size: " + partitions.size()); - assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size()); + @ClusterTemplate("generate1") + public void testCreateWithDefaults(TestInfo testInfo) throws InterruptedException, ExecutionException { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + Assertions.assertTrue(adminClient.listTopics().names().get().contains(testTopicName), + "Admin client didn't see the created topic. It saw: " + adminClient.listTopics().names().get()); + + List<TopicPartitionInfo> partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames() + .get() + .get(testTopicName) + .partitions(); + Assertions.assertEquals(defaultNumPartitions, partitions.size(), "Unequal partition size: " + partitions.size()); + Assertions.assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size()); + + adminClient.deleteTopics(Collections.singletonList(testTopicName)); + clusterInstance.waitForTopic(testTopicName, 0); + Assertions.assertTrue(adminClient.listTopics().names().get().isEmpty(), + "Admin client see the created topic. It saw: " + adminClient.listTopics().names().get()); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithDefaultReplication(String quorum) throws Exception { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - List<TopicPartitionInfo> partitions = adminClient - .describeTopics(Collections.singletonList(testTopicName)) - .allTopicNames() - .get() - .get(testTopicName) - .partitions(); - assertEquals(2, partitions.size(), "Unequal partition size: " + partitions.size()); - assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size()); + @ClusterTemplate("generate1") + public void testCreateWithDefaultReplication(TestInfo testInfo) throws InterruptedException, ExecutionException { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 2, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, 2); + List<TopicPartitionInfo> partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames() + .get() + .get(testTopicName) + .partitions(); + assertEquals(2, partitions.size(), "Unequal partition size: " + partitions.size()); + assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size()); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithDefaultPartitions(String quorum) throws Exception { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, 2, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - List<TopicPartitionInfo> partitions = adminClient - .describeTopics(Collections.singletonList(testTopicName)) - .allTopicNames() - .get() - .get(testTopicName) - .partitions(); - - assertEquals(defaultNumPartitions, partitions.size(), "Unequal partition size: " + partitions.size()); - assertEquals(2, (short) partitions.get(0).replicas().size(), "Partitions not replicated: " + partitions.get(0).replicas().size()); + @ClusterTemplate("generate1") + public void testCreateWithDefaultPartitions(TestInfo testInfo) throws InterruptedException, ExecutionException { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, (short) 2))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + List<TopicPartitionInfo> partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames() + .get() + .get(testTopicName) + .partitions(); + + assertEquals(defaultNumPartitions, partitions.size(), "Unequal partition size: " + partitions.size()); + assertEquals(2, (short) partitions.get(0).replicas().size(), "Partitions not replicated: " + partitions.get(0).replicas().size()); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithConfigs(String quorum) throws Exception { - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); - Properties topicConfig = new Properties(); - topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "1000"); + @ClusterTemplate("generate1") + public void testCreateWithConfigs(TestInfo testInfo) throws Exception { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, 2, - scala.collection.immutable.Map$.MODULE$.empty(), topicConfig - ); + try (Admin adminClient = clusterInstance.createAdminClient()) { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Map<String, String> topicConfig = new HashMap<>(); + topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "1000"); - Config configs = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); - assertEquals(1000, Integer.valueOf(configs.get("delete.retention.ms").value()), - "Config not set correctly: " + configs.get("delete.retention.ms").value()); + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 2, (short) 2).configs(topicConfig))); + clusterInstance.waitForTopic(testTopicName, 2); + + + Config configs = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); + assertEquals(1000, Integer.valueOf(configs.get("delete.retention.ms").value()), + "Config not set correctly: " + configs.get("delete.retention.ms").value()); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWhenAlreadyExists(String quorum) { - // create the topic - TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap( - "--create", "--partitions", Integer.toString(defaultNumPartitions), "--replication-factor", "1", - "--topic", testTopicName); + @ClusterTemplate("generate1") + public void testCreateWhenAlreadyExists(TestInfo testInfo) throws Exception { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap( + "--create", "--partitions", Integer.toString(defaultNumPartitions), "--replication-factor", "1", + "--topic", testTopicName); + + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + + // try to re-create the topic + assertThrows(TopicExistsException.class, () -> topicService.createTopic(createOpts), + "Expected TopicExistsException to throw"); + } + } - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - // try to re-create the topic - assertThrows(TopicExistsException.class, () -> topicService.createTopic(createOpts), - "Expected TopicExistsException to throw"); + @ClusterTemplate("generate1") + public void testCreateWhenAlreadyExistsWithIfNotExists(TestInfo testInfo) throws Exception { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + + TopicCommand.TopicCommandOptions createOpts = + buildTopicCommandOptionsWithBootstrap("--create", "--topic", testTopicName, "--if-not-exists"); + topicService.createTopic(createOpts); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWhenAlreadyExistsWithIfNotExists(String quorum) throws Exception { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TopicCommand.TopicCommandOptions createOpts = - buildTopicCommandOptionsWithBootstrap("--create", "--topic", testTopicName, "--if-not-exists"); - topicService.createTopic(createOpts); + private List<Integer> getPartitionReplicas(List<TopicPartitionInfo> partitions, int partitionNumber) { + return partitions.get(partitionNumber).replicas().stream().map(Node::id).collect(Collectors.toList()); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithReplicaAssignment(String quorum) throws Exception { - scala.collection.mutable.HashMap<Object, Seq<Object>> replicaAssignmentMap = new scala.collection.mutable.HashMap<>(); + @ClusterTemplate("generate1") + public void testCreateWithReplicaAssignment(TestInfo testInfo) throws Exception { + Map<Integer, List<Integer>> replicaAssignmentMap = new HashMap<>(); + try (Admin adminClient = clusterInstance.createAdminClient()) { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); - replicaAssignmentMap.put(0, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 5, (Object) 4)).asScala().toSeq()); - replicaAssignmentMap.put(1, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 3, (Object) 2)).asScala().toSeq()); - replicaAssignmentMap.put(2, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 1, (Object) 0)).asScala().toSeq()); + replicaAssignmentMap.put(0, Arrays.asList(5, 4)); + replicaAssignmentMap.put(1, Arrays.asList(3, 2)); + replicaAssignmentMap.put(2, Arrays.asList(1, 0)); - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, - defaultReplicationFactor, replicaAssignmentMap, new Properties() - ); + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, replicaAssignmentMap))); + clusterInstance.waitForTopic(testTopicName, 3); - List<TopicPartitionInfo> partitions = adminClient - .describeTopics(Collections.singletonList(testTopicName)) - .allTopicNames() - .get() - .get(testTopicName) - .partitions(); - - assertEquals(3, partitions.size(), - "Unequal partition size: " + partitions.size()); - assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0), - "Unexpected replica assignment: " + getPartitionReplicas(partitions, 0)); - assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 1), - "Unexpected replica assignment: " + getPartitionReplicas(partitions, 1)); - assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 2), - "Unexpected replica assignment: " + getPartitionReplicas(partitions, 2)); - } + List<TopicPartitionInfo> partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames() + .get() + .get(testTopicName) + .partitions(); - private List<Integer> getPartitionReplicas(List<TopicPartitionInfo> partitions, int partitionNumber) { - return partitions.get(partitionNumber).replicas().stream().map(Node::id).collect(Collectors.toList()); + adminClient.close(); + assertEquals(3, partitions.size(), + "Unequal partition size: " + partitions.size()); + assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0), + "Unexpected replica assignment: " + getPartitionReplicas(partitions, 0)); + assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 1), + "Unexpected replica assignment: " + getPartitionReplicas(partitions, 1)); + assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 2), + "Unexpected replica assignment: " + getPartitionReplicas(partitions, 2)); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithInvalidReplicationFactor(String quorum) { - TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", "--replication-factor", Integer.toString(Short.MAX_VALUE + 1), - "--topic", testTopicName); - assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); + @ClusterTemplate("generate1") + public void testCreateWithInvalidReplicationFactor(TestInfo testInfo) throws Exception { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + + TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", "--replication-factor", Integer.toString(Short.MAX_VALUE + 1), + "--topic", testTopicName); + assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithNegativeReplicationFactor(String quorum) { - TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", - "--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName); - assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); + @ClusterTemplate("generate1") + public void testCreateWithNegativeReplicationFactor(TestInfo testInfo) throws Exception { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", + "--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName); + assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithNegativePartitionCount(String quorum) { - TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName); - assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); + @ClusterTemplate("generate1") + public void testCreateWithNegativePartitionCount(TestInfo testInfo) throws Exception { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName); + assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testInvalidTopicLevelConfig(String quorum) { - TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", - "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName, - "--config", "message.timestamp.type=boom"); - assertThrows(ConfigException.class, () -> topicService.createTopic(createOpts), "Expected ConfigException to throw"); + @ClusterTemplate("generate1") + public void testInvalidTopicLevelConfig(TestInfo testInfo) { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient()) { + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient); + + TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", + "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName, + "--config", "message.timestamp.type=boom"); + assertThrows(ConfigException.class, () -> topicService.createTopic(createOpts), "Expected ConfigException to throw"); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testListTopics(String quorum) { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); + @ClusterTemplate("generate1") + public void testListTopics(TestInfo testInfo) throws InterruptedException { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); - String output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list")); - assertTrue(output.contains(testTopicName), "Expected topic name to be present in output: " + output); + String output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list")); + assertTrue(output.contains(testTopicName), "Expected topic name to be present in output: " + output); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testListTopicsWithIncludeList(String quorum) { - String topic1 = "kafka.testTopic1"; - String topic2 = "kafka.testTopic2"; - String topic3 = "oooof.testTopic1"; - int partition = 2; - short replicationFactor = 2; - TestUtils.createTopicWithAdmin(adminClient, topic1, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TestUtils.createTopicWithAdmin(adminClient, topic2, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TestUtils.createTopicWithAdmin(adminClient, topic3, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); + @ClusterTemplate("generate1") + public void testListTopicsWithIncludeList() throws InterruptedException { + try (Admin adminClient = clusterInstance.createAdminClient()) { + String topic1 = "kafka.testTopic1"; + String topic2 = "kafka.testTopic2"; + String topic3 = "oooof.testTopic1"; + int partition = 2; + short replicationFactor = 2; + adminClient.createTopics(Collections.singletonList(new NewTopic(topic1, partition, replicationFactor))); Review Comment: File another PR, https://github.com/apache/kafka/pull/16852, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org