Hi 志恒,
> When I use AdminClient to delete the topic, and immediately create the
topic that was just deleted, it always indicates that the topic already
exists
In Kafka, the topics info (metadata) is stored in cache, and will not be
updated immediately. That's why you saw this situation.
I don't think delete a topic and recreate the same one immediately is a
good idea.
Maybe you should change your codes accordingly.

Thank you.
Luke

On Mon, Nov 22, 2021 at 11:30 PM 王 志恒 <zhi.heng.w...@outlook.com> wrote:

> Hi,
> When I use AdminClient to delete the topic, and immediately create the
> topic that was just deleted, it always indicates that the topic already
> exists, but I try to get all the existing topics, the topic does not exist.
> Following is the code I tested.
>
> package org.wzh.three2.kafka.producer;
>
> import org.apache.kafka.clients.admin.*;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.RecordMetadata;
> import org.apache.kafka.common.serialization.StringSerializer;
>
> import java.util.Collections;
> import java.util.Properties;
> import java.util.concurrent.ExecutionException;
> import java.util.concurrent.Future;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.locks.LockSupport;
>
> public class ProducerClient {
>
>     public static final String TOPIC = "topic-three2-kafka";
>
>     public static void main(String[] args) throws ExecutionException,
> InterruptedException {
>         initTopic();
>
> //        KafkaProducer<String, String> producer = new
> KafkaProducer<>(initConfig());
> //        for (int i = 0; i < Integer.MAX_VALUE; i++) {
> //            ProducerRecord<String, String> record = new
> ProducerRecord<>(TOPIC, String.valueOf(i));
> //            try {
> //                Future<RecordMetadata> future = producer.send(record);
> //                RecordMetadata metadata = future.get();
> //                System.out.println(metadata.topic() + " - " +
> metadata.partition() + ":" + metadata.offset());
> //            } catch (InterruptedException e) {
> //                e.printStackTrace();
> //            } catch (ExecutionException e) {
> //                e.printStackTrace();
> //            }
> //            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
> //        }
>     }
>
>     private static void listTopics(AdminClient client) throws
> ExecutionException, InterruptedException {
>         System.out.println("+------Topics-------+");
>
> client.listTopics().names().get().stream().forEach(System.out::println);
>         System.out.println("+-------------------+");
>     }
>
>     // 创建主题
>     public static void initTopic() throws ExecutionException,
> InterruptedException {
>         Properties props = new Properties();
>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.111.101:9092,192.168.111.102:9092,192.168.111.103:9092");
>         AdminClient client = KafkaAdminClient.create(props);
>
>         listTopics(client);
>
>         NewTopic newTopic = new NewTopic(TOPIC, 10, (short) 3);
>         if(client.listTopics().names().get().contains(TOPIC)) {
>             System.out.println("Will delete topic: " + TOPIC);
>             try {
>                 DeleteTopicsResult deleteTopicsResult =
> client.deleteTopics(Collections.singletonList(newTopic.name()));
>                 deleteTopicsResult.all().get();
>                 deleteTopicsResult.values().forEach((k, v) ->
> System.out.println(k + "\t" + v));
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             } catch (ExecutionException e) {
>                 e.printStackTrace();
>             }
>         }
>
>         listTopics(client);
>
>         try {
>             System.out.println("Will create topic: " + TOPIC);
>             CreateTopicsResult result =
> client.createTopics(Collections.singletonList(newTopic));
>             result.all().get();
>             result.values().forEach((k, v) -> System.out.println(k + "\t"
> + v));
>         } catch (InterruptedException e) {
>             e.printStackTrace();
>         } catch (ExecutionException e) {
>             e.printStackTrace();
>         }
>         client.close();
>     }
>
>     public static Properties initConfig() {
>         Properties props = new Properties();
>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.111.101:9092,192.168.111.102:9092,192.168.111.103:9092");
>         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
>         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
>         props.put(ProducerConfig.CLIENT_ID_CONFIG,
> "producer.client.id.demo");
>         return props;
>     }
> }
>
> Brs/newbie
> 从 Windows 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>发送
>
>

Reply via email to