[ 
https://issues.apache.org/jira/browse/KAFKA-9534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17033997#comment-17033997
 ] 

Boyang Chen commented on KAFKA-9534:
------------------------------------

The reproduction is not very consistent, will take some time to further debug.

> Topics could not be deleted when there is a concurrent create topic request 
> loop
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-9534
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9534
>             Project: Kafka
>          Issue Type: Bug
>          Components: admin
>    Affects Versions: 2.3.0, 2.5.0
>            Reporter: Boyang Chen
>            Priority: Major
>
> The reproduce steps:
>  # start local ZK
>  # start local broker
>  #  Run the following script which keeps creating an input topic until 
> success:
>  
> {code:java}
> package kafka.examples;
> import org.apache.kafka.clients.admin.Admin;
> import org.apache.kafka.clients.admin.NewTopic;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.common.errors.TopicExistsException;
> import java.util.Arrays;
> import java.util.List;
> import java.util.Properties;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.ExecutionException;
> public class Reproduce {
>     public static void main(String[] args) throws ExecutionException, 
> InterruptedException {
>         Properties props = new Properties();
>         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
>             KafkaProperties.KAFKA_SERVER_URL + ":" + 
> KafkaProperties.KAFKA_SERVER_PORT);
>         Admin adminClient = Admin.create(props);
>         createTopics(adminClient);
>         CountDownLatch createSucceed = new CountDownLatch(1);
>         Thread deleteTopicThread = new Thread(() -> {
>             List<String> topicsToDelete = Arrays.asList("input-topic", 
> "output-topic");
>             while (true) {
>                 try {
>                     Thread.sleep(1000);
>                     adminClient.deleteTopics(topicsToDelete).all().get();
>                     if (createSucceed.getCount() == 0) {
>                         break;
>                     }
>                 } catch (ExecutionException | InterruptedException e) {
>                     System.out.println("Encountered exception during topic 
> deletion: " + e.getCause());
>                 }
>             }
>             System.out.println("Deleted old topics: " + topicsToDelete);
>          });
>         deleteTopicThread.start();
>         while (true) {
>             try {
>                 createTopics(adminClient);
>                 System.out.println("Created new topic!");
>                 break;
>             } catch (ExecutionException | InterruptedException e) {
>                 if (!(e.getCause() instanceof TopicExistsException)) {
>                     throw e;
>                 }
>                 System.out.println("Metadata of the old topics are not 
> cleared yet... " + e.getMessage());
>                 Thread.sleep(1000);
>             }
>         }
>         createSucceed.countDown();
>         deleteTopicThread.join();
>     }
>     private static void createTopics(Admin adminClient) throws 
> InterruptedException, ExecutionException {
>         adminClient.createTopics(Arrays.asList(
>             new NewTopic("input-topic", 1, (short) 1),
>             new NewTopic("output-topic", 1, (short) 1))).all().get();
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to