[ https://issues.apache.org/jira/browse/KAFKA-9534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Boyang Chen updated KAFKA-9534: ------------------------------- Description: The reproduce steps: # start local ZK # start local broker # Run the following script which keeps creating an input topic until success: {code:java} package kafka.examples; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.errors.TopicExistsException; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; public class Reproduce { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); Admin adminClient = Admin.create(props); createTopics(adminClient); CountDownLatch createSucceed = new CountDownLatch(1); Thread deleteTopicThread = new Thread(() -> { List<String> topicsToDelete = Arrays.asList("input-topic", "output-topic"); while (true) { try { Thread.sleep(1000); adminClient.deleteTopics(topicsToDelete).all().get(); if (createSucceed.getCount() == 0) { break; } } catch (ExecutionException | InterruptedException e) { System.out.println("Encountered exception during topic deletion: " + e.getCause()); } } System.out.println("Deleted old topics: " + topicsToDelete); }); deleteTopicThread.start(); while (true) { try { createTopics(adminClient); System.out.println("Created new topic!"); break; } catch (ExecutionException | InterruptedException e) { if (!(e.getCause() instanceof TopicExistsException)) { throw e; } System.out.println("Metadata of the old topics are not cleared yet... " + e.getMessage()); Thread.sleep(1000); } } createSucceed.countDown(); deleteTopicThread.join(); } private static void createTopics(Admin adminClient) throws InterruptedException, ExecutionException { adminClient.createTopics(Arrays.asList( new NewTopic("input-topic", 1, (short) 1), new NewTopic("output-topic", 1, (short) 1))).all().get(); } } {code} was:Not sure this is indeed a bug, just making it under track. > Topics could not be deleted when there is a concurrent create topic request > loop > -------------------------------------------------------------------------------- > > Key: KAFKA-9534 > URL: https://issues.apache.org/jira/browse/KAFKA-9534 > Project: Kafka > Issue Type: Bug > Components: admin > Affects Versions: 2.3.0, 2.5.0 > Reporter: Boyang Chen > Priority: Major > > The reproduce steps: > # start local ZK > # start local broker > # Run the following script which keeps creating an input topic until > success: > > {code:java} > package kafka.examples; > import org.apache.kafka.clients.admin.Admin; > import org.apache.kafka.clients.admin.NewTopic; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.common.errors.TopicExistsException; > import java.util.Arrays; > import java.util.List; > import java.util.Properties; > import java.util.concurrent.CountDownLatch; > import java.util.concurrent.ExecutionException; > public class Reproduce { > public static void main(String[] args) throws ExecutionException, > InterruptedException { > Properties props = new Properties(); > props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > KafkaProperties.KAFKA_SERVER_URL + ":" + > KafkaProperties.KAFKA_SERVER_PORT); > Admin adminClient = Admin.create(props); > createTopics(adminClient); > CountDownLatch createSucceed = new CountDownLatch(1); > Thread deleteTopicThread = new Thread(() -> { > List<String> topicsToDelete = Arrays.asList("input-topic", > "output-topic"); > while (true) { > try { > Thread.sleep(1000); > adminClient.deleteTopics(topicsToDelete).all().get(); > if (createSucceed.getCount() == 0) { > break; > } > } catch (ExecutionException | InterruptedException e) { > System.out.println("Encountered exception during topic > deletion: " + e.getCause()); > } > } > System.out.println("Deleted old topics: " + topicsToDelete); > }); > deleteTopicThread.start(); > while (true) { > try { > createTopics(adminClient); > System.out.println("Created new topic!"); > break; > } catch (ExecutionException | InterruptedException e) { > if (!(e.getCause() instanceof TopicExistsException)) { > throw e; > } > System.out.println("Metadata of the old topics are not > cleared yet... " + e.getMessage()); > Thread.sleep(1000); > } > } > createSucceed.countDown(); > deleteTopicThread.join(); > } > private static void createTopics(Admin adminClient) throws > InterruptedException, ExecutionException { > adminClient.createTopics(Arrays.asList( > new NewTopic("input-topic", 1, (short) 1), > new NewTopic("output-topic", 1, (short) 1))).all().get(); > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)