[ https://issues.apache.org/jira/browse/KAFKA-18818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17933333#comment-17933333 ]
Shubham Raj commented on KAFKA-18818: ------------------------------------- [~junrao] [~yangpoan] [~dajac] [~cmccabe] Can you please get this checked ? > Significant delay in Metadata Update After Admin Client Operations in KRaft > Mode > -------------------------------------------------------------------------------- > > Key: KAFKA-18818 > URL: https://issues.apache.org/jira/browse/KAFKA-18818 > Project: Kafka > Issue Type: Bug > Components: kraft > Affects Versions: 3.9.0 > Reporter: Shubham Raj > Priority: Major > > We have observed a significant delay in metadata updates after performing > asynchronous Admin Client operations (such as {{{}createTopics{}}}, > {{{}deleteTopics{}}}, {{{}createAcls{}}}, and {{{}deleteAcls, > changeTopicConfigs{}}}) using both the Confluent Python library and the > Apache Kafka Java library when running Kafka in KRaft mode. > *Issue Details:* > When using the Admin Client to perform operations, we receive a {{Future}} > map as the calls are asynchronous. Upon waiting for the {{Future}} results, > we expect the operations to succeed, and the changes should be immediately > reflected in the metadata. This behavior is consistent with what we observe > when running Kafka in ZooKeeper mode. > However, in KRaft mode, even after {{future.result()}} completes, the > operation results are not instantly reflected in the metadata. We have > observed an additional delay before the metadata is updated to reflect the > changes. In our case, this delay is approximately 500ms, but it could vary > depending on the cluster load. > *Impact:* > This delay is affecting our workflows where Kafka users need to perform Admin > Client operations and expect immediate metadata updates. The additional wait > time introduces latency and could potentially impact time-sensitive > operations. > *Steps performed :* > * Called {{AdminClient.createTopics}} to create a test topic and waited on > its KafkaFuture result, the future result returns null to denote a topic > creation ( *[CreateTopicsResult > ref|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/CreateTopicsResult.html]* > ). > * Started a loop to try and describe the newly created topic ( to validate > that the topic is present in Kafka's metadata ). > Noted that the topic does not get immediately reflected in metadata, as we > receive an _*{{UnknownTopicOrPartitionException}}*_ > * Checked and noted the number of iteration cycles it takes to properly > reflect the newly created topic in metadata. > In the below test the createTopics operation takes *407 ms* for the topic to > properly appear in metadata ( which is additional time client needs to wait > after the KafkaFuture resolves ). > > * *Kafka version: 3.9.0* > * *Java library version: 2.8.0* > {code:java} > package kafkautils.trycode; > import java.util.*; > import local.kafka.KafkaConfig; > import java.util.concurrent.Callable; > import java.util.concurrent.ExecutionException; > import java.util.function.Function; > import java.util.logging.Logger; > import org.apache.kafka.clients.admin.*; > import org.apache.kafka.common.acl.*; > import org.apache.kafka.common.config.ConfigResource; > import org.apache.kafka.common.resource.PatternType; > import org.apache.kafka.common.resource.ResourcePattern; > import org.apache.kafka.common.resource.ResourcePatternFilter; > import org.apache.kafka.common.resource.ResourceType; > public class TryAdminClient { > /** The Logger. **/ > private static final Logger LOG > =LogFactory.getLogger(TryAdminClient.class); > private static final String CLUSTER = "alpha"; > private static final String MAIN_RESOURCE_PREFIX = > "testkafka.test_cluster_operations_delay.java_client."; > private static final Admin adminClient; > static { > Properties adminProps = new Properties(); > adminProps.put( > AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, > KafkaConfig.makeKafkaConfig(CLUSTER).getBootStrapServersWithPort() > ); adminProps.put("security.protocol", "SASL_PLAINTEXT"); > adminProps.put("sasl.mechanism", "GSSAPI"); > adminProps.put("sasl.kerberos.service.name", "kafka"); > adminProps.put("sasl.kerberos.init.cmd", "/usr/local/bin/skinit > --quiet"); > adminProps.put("sun.security.jgss.native", "true"); > adminProps.put("sun.security.jgss.lib", "/usr/libexec/libgsswrap.so"); > adminProps.put("javax.security.auth.useSubjectCredsOnly", "false"); > adminProps.put("java.security.auth.login.config", > > "/data/middleware/kafkautils/etc/config/kafka/kafka_client_jaas.conf"); > adminClient = Admin.create(adminProps); > } > public static void main(String[] args) throws ExecutionException, > InterruptedException > { > TryAdminClient client = new TryAdminClient(); > String topic = MAIN_RESOURCE_PREFIX + > "single_topic_operation_with_admin_client_test.a.1"; > var newTopic = new NewTopic(topic, 1, > (short)3).configs(Map.of("cleanup.policy", "delete")); > var createTopicsResult = > adminClient.createTopics(List.of(newTopic)).all().get(); > LOG.info("CreateTopics finished with result : " + createTopicsResult); > long queryStartTimestamp = System.currentTimeMillis(); > while (true) { > try { > var describeTopicsResult = > adminClient.describeTopics(List.of(topic)).all().get().get(topic); > LOG.info("DescribeTopics returned result : " + > describeTopicsResult); > if (describeTopicsResult != null) > break; > } catch (ExecutionException err) { > LOG.warning("DescribeTopics operation failed with error : " + > err); > } > } > LOG.info(String.format( > "Describe operation took = %d ms", > System.currentTimeMillis() - queryStartTimestamp > )); > } > } {code} > > *logs we got* > {code:java} > [20250214 07:11:43.514 EST (main) kafkautils.trycode.TryAdminClient#main > INFO] CreateTopics finished with result : null > [20250214 07:11:43.538 EST (main) kafkautils.trycode.TryAdminClient#main > WARNING] DescribeTopics operation failed with error : > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition... repeated 146 times > [20250214 07:11:43.938 EST (main) kafkautils.trycode.TryAdminClient#main > INFO] DescribeTopics returned result : > (name=testkafka.test_cluster_operations_delay.java_client.single_topic_operation_with_admin_client_test.a.1, > internal=false, partitions=(partition=0, > leader=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com), > replicas=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com), > qtekafka-alpha-02.dr.local.com:9092 (id: 4 rack: dr.local.com), > qtekafka-alpha-02.nyc.local.com:9092 (id: 2 rack: nyc.local.com), > isr=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com), > qtekafka-alpha-02.dr.local.com:9092 (id: 4 rack: dr.local.com), > qtekafka-alpha-02.nyc.local.com:9092 (id: 2 rack: nyc.local.com)), > authorizedOperations=null) > [20250214 07:11:43.939 EST (main) kafkautils.trycode.TryAdminClient#main > INFO] Describe operation took = 407 ms{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)