[ 
https://issues.apache.org/jira/browse/KAFKA-9534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9534:
-------------------------------
    Description: 
The reproduce steps:
 # start local ZK
 # start local broker
 #  Run the following script which keeps creating an input topic until success:


 
{code:java}
package kafka.examples;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.TopicExistsException;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

public class Reproduce {

    public static void main(String[] args) throws ExecutionException, 
InterruptedException {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT);

        Admin adminClient = Admin.create(props);

        createTopics(adminClient);
        CountDownLatch createSucceed = new CountDownLatch(1);
        Thread deleteTopicThread = new Thread(() -> {
            List<String> topicsToDelete = Arrays.asList("input-topic", 
"output-topic");
            while (true) {
                try {
                    Thread.sleep(1000);
                    adminClient.deleteTopics(topicsToDelete).all().get();
                    if (createSucceed.getCount() == 0) {
                        break;
                    }
                } catch (ExecutionException | InterruptedException e) {
                    System.out.println("Encountered exception during topic 
deletion: " + e.getCause());
                }
            }
            System.out.println("Deleted old topics: " + topicsToDelete);
         });
        deleteTopicThread.start();

        while (true) {
            try {
                createTopics(adminClient);
                System.out.println("Created new topic!");
                break;
            } catch (ExecutionException | InterruptedException e) {
                if (!(e.getCause() instanceof TopicExistsException)) {
                    throw e;
                }
                System.out.println("Metadata of the old topics are not cleared 
yet... " + e.getMessage());

                Thread.sleep(1000);
            }
        }
        createSucceed.countDown();
        deleteTopicThread.join();
    }

    private static void createTopics(Admin adminClient) throws 
InterruptedException, ExecutionException {
        adminClient.createTopics(Arrays.asList(
            new NewTopic("input-topic", 1, (short) 1),
            new NewTopic("output-topic", 1, (short) 1))).all().get();
    }
}

{code}

  was:Not sure this is indeed a bug, just making it under track.


> Topics could not be deleted when there is a concurrent create topic request 
> loop
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-9534
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9534
>             Project: Kafka
>          Issue Type: Bug
>          Components: admin
>    Affects Versions: 2.3.0, 2.5.0
>            Reporter: Boyang Chen
>            Priority: Major
>
> The reproduce steps:
>  # start local ZK
>  # start local broker
>  #  Run the following script which keeps creating an input topic until 
> success:
>  
> {code:java}
> package kafka.examples;
> import org.apache.kafka.clients.admin.Admin;
> import org.apache.kafka.clients.admin.NewTopic;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.common.errors.TopicExistsException;
> import java.util.Arrays;
> import java.util.List;
> import java.util.Properties;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.ExecutionException;
> public class Reproduce {
>     public static void main(String[] args) throws ExecutionException, 
> InterruptedException {
>         Properties props = new Properties();
>         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
>             KafkaProperties.KAFKA_SERVER_URL + ":" + 
> KafkaProperties.KAFKA_SERVER_PORT);
>         Admin adminClient = Admin.create(props);
>         createTopics(adminClient);
>         CountDownLatch createSucceed = new CountDownLatch(1);
>         Thread deleteTopicThread = new Thread(() -> {
>             List<String> topicsToDelete = Arrays.asList("input-topic", 
> "output-topic");
>             while (true) {
>                 try {
>                     Thread.sleep(1000);
>                     adminClient.deleteTopics(topicsToDelete).all().get();
>                     if (createSucceed.getCount() == 0) {
>                         break;
>                     }
>                 } catch (ExecutionException | InterruptedException e) {
>                     System.out.println("Encountered exception during topic 
> deletion: " + e.getCause());
>                 }
>             }
>             System.out.println("Deleted old topics: " + topicsToDelete);
>          });
>         deleteTopicThread.start();
>         while (true) {
>             try {
>                 createTopics(adminClient);
>                 System.out.println("Created new topic!");
>                 break;
>             } catch (ExecutionException | InterruptedException e) {
>                 if (!(e.getCause() instanceof TopicExistsException)) {
>                     throw e;
>                 }
>                 System.out.println("Metadata of the old topics are not 
> cleared yet... " + e.getMessage());
>                 Thread.sleep(1000);
>             }
>         }
>         createSucceed.countDown();
>         deleteTopicThread.join();
>     }
>     private static void createTopics(Admin adminClient) throws 
> InterruptedException, ExecutionException {
>         adminClient.createTopics(Arrays.asList(
>             new NewTopic("input-topic", 1, (short) 1),
>             new NewTopic("output-topic", 1, (short) 1))).all().get();
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to