[ 
https://issues.apache.org/jira/browse/KAFKA-18818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17933333#comment-17933333
 ] 

Shubham Raj commented on KAFKA-18818:
-------------------------------------

[~junrao] [~yangpoan] [~dajac] [~cmccabe] Can you please get this checked ?

> Significant delay in Metadata Update After Admin Client Operations in KRaft 
> Mode
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-18818
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18818
>             Project: Kafka
>          Issue Type: Bug
>          Components: kraft
>    Affects Versions: 3.9.0
>            Reporter: Shubham Raj
>            Priority: Major
>
> We have observed a significant delay in metadata updates after performing 
> asynchronous Admin Client operations (such as {{{}createTopics{}}}, 
> {{{}deleteTopics{}}}, {{{}createAcls{}}}, and {{{}deleteAcls, 
> changeTopicConfigs{}}}) using both the Confluent Python library and the 
> Apache Kafka Java library when running Kafka in KRaft mode.
> *Issue Details:*
> When using the Admin Client to perform operations, we receive a {{Future}} 
> map as the calls are asynchronous. Upon waiting for the {{Future}} results, 
> we expect the operations to succeed, and the changes should be immediately 
> reflected in the metadata. This behavior is consistent with what we observe 
> when running Kafka in ZooKeeper mode.
> However, in KRaft mode, even after {{future.result()}} completes, the 
> operation results are not instantly reflected in the metadata. We have 
> observed an additional delay before the metadata is updated to reflect the 
> changes. In our case, this delay is approximately 500ms, but it could vary 
> depending on the cluster load.
> *Impact:*
> This delay is affecting our workflows where Kafka users need to perform Admin 
> Client operations and expect immediate metadata updates. The additional wait 
> time introduces latency and could potentially impact time-sensitive 
> operations.
> *Steps performed :*
>  * Called {{AdminClient.createTopics}} to create a test topic and waited on 
> its KafkaFuture result, the future result returns null to denote a topic 
> creation ( *[CreateTopicsResult 
> ref|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/CreateTopicsResult.html]*
>  ).
>  * Started a loop to try and describe the newly created topic ( to validate 
> that the topic is present in Kafka's metadata ).
> Noted that the topic does not get immediately reflected in metadata, as we 
> receive an _*{{UnknownTopicOrPartitionException}}*_
>  * Checked and noted the number of iteration cycles it takes to properly 
> reflect the newly created topic in metadata.
> In the below test the createTopics operation takes *407 ms* for the topic to 
> properly appear in metadata ( which is additional time client needs to wait 
> after the KafkaFuture resolves ).
>  
>  * *Kafka version: 3.9.0*
>  * *Java library version: 2.8.0*
> {code:java}
> package kafkautils.trycode;
> import java.util.*;
> import local.kafka.KafkaConfig;
> import java.util.concurrent.Callable;
> import java.util.concurrent.ExecutionException;
> import java.util.function.Function;
> import java.util.logging.Logger;
> import org.apache.kafka.clients.admin.*;
> import org.apache.kafka.common.acl.*;
> import org.apache.kafka.common.config.ConfigResource;
> import org.apache.kafka.common.resource.PatternType;
> import org.apache.kafka.common.resource.ResourcePattern;
> import org.apache.kafka.common.resource.ResourcePatternFilter;
> import org.apache.kafka.common.resource.ResourceType;
> public class TryAdminClient {    
>     /** The Logger. **/
>     private static final Logger LOG 
> =LogFactory.getLogger(TryAdminClient.class);    
>     private static final String CLUSTER = "alpha";    
>     private static final String MAIN_RESOURCE_PREFIX = 
> "testkafka.test_cluster_operations_delay.java_client.";   
>     private static final Admin adminClient;
>     static {
>         Properties adminProps = new Properties();
>         adminProps.put(
>             AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
>             KafkaConfig.makeKafkaConfig(CLUSTER).getBootStrapServersWithPort()
>         );        adminProps.put("security.protocol", "SASL_PLAINTEXT");
>         adminProps.put("sasl.mechanism", "GSSAPI");
>         adminProps.put("sasl.kerberos.service.name", "kafka");
>         adminProps.put("sasl.kerberos.init.cmd", "/usr/local/bin/skinit 
> --quiet");
>         adminProps.put("sun.security.jgss.native", "true");
>         adminProps.put("sun.security.jgss.lib", "/usr/libexec/libgsswrap.so");
>         adminProps.put("javax.security.auth.useSubjectCredsOnly", "false");
>         adminProps.put("java.security.auth.login.config",
>                   
> "/data/middleware/kafkautils/etc/config/kafka/kafka_client_jaas.conf");
>         adminClient = Admin.create(adminProps);
>     }   
>  public static void main(String[] args) throws ExecutionException, 
> InterruptedException 
>  {
>         TryAdminClient client  = new TryAdminClient();
>         String topic           = MAIN_RESOURCE_PREFIX + 
> "single_topic_operation_with_admin_client_test.a.1";
>         var newTopic           = new NewTopic(topic, 1, 
> (short)3).configs(Map.of("cleanup.policy",  "delete"));
>         var createTopicsResult = 
> adminClient.createTopics(List.of(newTopic)).all().get();
>         LOG.info("CreateTopics finished with result : " + createTopicsResult);
>         long queryStartTimestamp = System.currentTimeMillis();
>         while (true) {
>             try {
>                 var describeTopicsResult = 
> adminClient.describeTopics(List.of(topic)).all().get().get(topic);
>                 LOG.info("DescribeTopics returned result : " + 
> describeTopicsResult);
>             if (describeTopicsResult != null)
>                     break;
>             } catch (ExecutionException err) {
>                 LOG.warning("DescribeTopics operation failed with error : " + 
> err);
>             }
>         }
>         LOG.info(String.format(
>             "Describe operation took = %d ms",
>             System.currentTimeMillis() - queryStartTimestamp
>         ));
>     }
> } {code}
>  
> *logs we got*
> {code:java}
> [20250214 07:11:43.514 EST (main) kafkautils.trycode.TryAdminClient#main 
> INFO] CreateTopics finished with result : null
> [20250214 07:11:43.538 EST (main) kafkautils.trycode.TryAdminClient#main 
> WARNING] DescribeTopics operation failed with error : 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition... repeated 146 times 
> [20250214 07:11:43.938 EST (main) kafkautils.trycode.TryAdminClient#main 
> INFO] DescribeTopics returned result : 
> (name=testkafka.test_cluster_operations_delay.java_client.single_topic_operation_with_admin_client_test.a.1,
>  internal=false, partitions=(partition=0, 
> leader=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com), 
> replicas=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com), 
> qtekafka-alpha-02.dr.local.com:9092 (id: 4 rack: dr.local.com), 
> qtekafka-alpha-02.nyc.local.com:9092 (id: 2 rack: nyc.local.com), 
> isr=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com), 
> qtekafka-alpha-02.dr.local.com:9092 (id: 4 rack: dr.local.com), 
> qtekafka-alpha-02.nyc.local.com:9092 (id: 2 rack: nyc.local.com)), 
> authorizedOperations=null)
> [20250214 07:11:43.939 EST (main) kafkautils.trycode.TryAdminClient#main 
> INFO] Describe operation took = 407 ms{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to