[ https://issues.apache.org/jira/browse/KAFKA-9534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17033997#comment-17033997 ]
Boyang Chen commented on KAFKA-9534: ------------------------------------ The reproduction is not very consistent, will take some time to further debug. > Topics could not be deleted when there is a concurrent create topic request > loop > -------------------------------------------------------------------------------- > > Key: KAFKA-9534 > URL: https://issues.apache.org/jira/browse/KAFKA-9534 > Project: Kafka > Issue Type: Bug > Components: admin > Affects Versions: 2.3.0, 2.5.0 > Reporter: Boyang Chen > Priority: Major > > The reproduce steps: > # start local ZK > # start local broker > # Run the following script which keeps creating an input topic until > success: > > {code:java} > package kafka.examples; > import org.apache.kafka.clients.admin.Admin; > import org.apache.kafka.clients.admin.NewTopic; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.common.errors.TopicExistsException; > import java.util.Arrays; > import java.util.List; > import java.util.Properties; > import java.util.concurrent.CountDownLatch; > import java.util.concurrent.ExecutionException; > public class Reproduce { > public static void main(String[] args) throws ExecutionException, > InterruptedException { > Properties props = new Properties(); > props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > KafkaProperties.KAFKA_SERVER_URL + ":" + > KafkaProperties.KAFKA_SERVER_PORT); > Admin adminClient = Admin.create(props); > createTopics(adminClient); > CountDownLatch createSucceed = new CountDownLatch(1); > Thread deleteTopicThread = new Thread(() -> { > List<String> topicsToDelete = Arrays.asList("input-topic", > "output-topic"); > while (true) { > try { > Thread.sleep(1000); > adminClient.deleteTopics(topicsToDelete).all().get(); > if (createSucceed.getCount() == 0) { > break; > } > } catch (ExecutionException | InterruptedException e) { > System.out.println("Encountered exception during topic > deletion: " + e.getCause()); > } > } > System.out.println("Deleted old topics: " + topicsToDelete); > }); > deleteTopicThread.start(); > while (true) { > try { > createTopics(adminClient); > System.out.println("Created new topic!"); > break; > } catch (ExecutionException | InterruptedException e) { > if (!(e.getCause() instanceof TopicExistsException)) { > throw e; > } > System.out.println("Metadata of the old topics are not > cleared yet... " + e.getMessage()); > Thread.sleep(1000); > } > } > createSucceed.countDown(); > deleteTopicThread.join(); > } > private static void createTopics(Admin adminClient) throws > InterruptedException, ExecutionException { > adminClient.createTopics(Arrays.asList( > new NewTopic("input-topic", 1, (short) 1), > new NewTopic("output-topic", 1, (short) 1))).all().get(); > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)