[
https://issues.apache.org/jira/browse/KAFKA-18818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18036358#comment-18036358
]
Erik van Oosten commented on KAFKA-18818:
-----------------------------------------
One thing I'd like to add: the zio-kafka unit tests that I mentioned in my
previous comment run against a single shared embedded kafka broker (running in
the same jvm). Network latency should be practically zero. There might be some
disk latency.
> Significant delay in Metadata Update After Admin Client Operations in KRaft
> Mode
> --------------------------------------------------------------------------------
>
> Key: KAFKA-18818
> URL: https://issues.apache.org/jira/browse/KAFKA-18818
> Project: Kafka
> Issue Type: Bug
> Components: kraft
> Affects Versions: 3.9.0
> Reporter: Shubham Raj
> Assignee: José Armando García Sancio
> Priority: Major
> Fix For: 4.1.0, 4.2.0
>
> Attachments: old-batch-error.patch
>
>
> We have observed a significant delay in metadata updates after performing
> asynchronous Admin Client operations (such as {{{}createTopics{}}},
> {{{}deleteTopics{}}}, {{{}createAcls{}}}, and {{{}deleteAcls,
> changeTopicConfigs{}}}) using both the Confluent Python library and the
> Apache Kafka Java library when running Kafka in KRaft mode.
> *Issue Details:*
> When using the Admin Client to perform operations, we receive a {{Future}}
> map as the calls are asynchronous. Upon waiting for the {{Future}} results,
> we expect the operations to succeed, and the changes should be immediately
> reflected in the metadata. This behavior is consistent with what we observe
> when running Kafka in ZooKeeper mode.
> However, in KRaft mode, even after {{future.result()}} completes, the
> operation results are not instantly reflected in the metadata. We have
> observed an additional delay before the metadata is updated to reflect the
> changes. In our case, this delay is approximately 500ms, but it could vary
> depending on the cluster load.
> *Impact:*
> This delay is affecting our workflows where Kafka users need to perform Admin
> Client operations and expect immediate metadata updates. The additional wait
> time introduces latency and could potentially impact time-sensitive
> operations.
> *Steps performed :*
> * Called {{AdminClient.createTopics}} to create a test topic and waited on
> its KafkaFuture result, the future result returns null to denote a topic
> creation ( *[CreateTopicsResult
> ref|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/CreateTopicsResult.html]*
> ).
> * Started a loop to try and describe the newly created topic ( to validate
> that the topic is present in Kafka's metadata ).
> Noted that the topic does not get immediately reflected in metadata, as we
> receive an _*{{UnknownTopicOrPartitionException}}*_
> * Checked and noted the number of iteration cycles it takes to properly
> reflect the newly created topic in metadata.
> In the below test the createTopics operation takes *407 ms* for the topic to
> properly appear in metadata ( which is additional time client needs to wait
> after the KafkaFuture resolves ).
>
> * *Kafka version: 3.9.0*
> * *Java library version: 2.8.0*
> {code:java}
> package kafkautils.trycode;
> import java.util.*;
> import local.kafka.KafkaConfig;
> import java.util.concurrent.Callable;
> import java.util.concurrent.ExecutionException;
> import java.util.function.Function;
> import java.util.logging.Logger;
> import org.apache.kafka.clients.admin.*;
> import org.apache.kafka.common.acl.*;
> import org.apache.kafka.common.config.ConfigResource;
> import org.apache.kafka.common.resource.PatternType;
> import org.apache.kafka.common.resource.ResourcePattern;
> import org.apache.kafka.common.resource.ResourcePatternFilter;
> import org.apache.kafka.common.resource.ResourceType;
> public class TryAdminClient {
> /** The Logger. **/
> private static final Logger LOG
> =LogFactory.getLogger(TryAdminClient.class);
> private static final String CLUSTER = "alpha";
> private static final String MAIN_RESOURCE_PREFIX =
> "testkafka.test_cluster_operations_delay.java_client.";
> private static final Admin adminClient;
> static {
> Properties adminProps = new Properties();
> adminProps.put(
> AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
> KafkaConfig.makeKafkaConfig(CLUSTER).getBootStrapServersWithPort()
> ); adminProps.put("security.protocol", "SASL_PLAINTEXT");
> adminProps.put("sasl.mechanism", "GSSAPI");
> adminProps.put("sasl.kerberos.service.name", "kafka");
> adminProps.put("sasl.kerberos.init.cmd", "/usr/local/bin/skinit
> --quiet");
> adminProps.put("sun.security.jgss.native", "true");
> adminProps.put("sun.security.jgss.lib", "/usr/libexec/libgsswrap.so");
> adminProps.put("javax.security.auth.useSubjectCredsOnly", "false");
> adminProps.put("java.security.auth.login.config",
>
> "/data/middleware/kafkautils/etc/config/kafka/kafka_client_jaas.conf");
> adminClient = Admin.create(adminProps);
> }
> public static void main(String[] args) throws ExecutionException,
> InterruptedException
> {
> TryAdminClient client = new TryAdminClient();
> String topic = MAIN_RESOURCE_PREFIX +
> "single_topic_operation_with_admin_client_test.a.1";
> var newTopic = new NewTopic(topic, 1,
> (short)3).configs(Map.of("cleanup.policy", "delete"));
> var createTopicsResult =
> adminClient.createTopics(List.of(newTopic)).all().get();
> LOG.info("CreateTopics finished with result : " + createTopicsResult);
> long queryStartTimestamp = System.currentTimeMillis();
> while (true) {
> try {
> var describeTopicsResult =
> adminClient.describeTopics(List.of(topic)).all().get().get(topic);
> LOG.info("DescribeTopics returned result : " +
> describeTopicsResult);
> if (describeTopicsResult != null)
> break;
> } catch (ExecutionException err) {
> LOG.warning("DescribeTopics operation failed with error : " +
> err);
> }
> }
> LOG.info(String.format(
> "Describe operation took = %d ms",
> System.currentTimeMillis() - queryStartTimestamp
> ));
> }
> } {code}
>
> *logs we got*
> {code:java}
> [20250214 07:11:43.514 EST (main) kafkautils.trycode.TryAdminClient#main
> INFO] CreateTopics finished with result : null
> [20250214 07:11:43.538 EST (main) kafkautils.trycode.TryAdminClient#main
> WARNING] DescribeTopics operation failed with error :
> java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server
> does not host this topic-partition... repeated 146 times
> [20250214 07:11:43.938 EST (main) kafkautils.trycode.TryAdminClient#main
> INFO] DescribeTopics returned result :
> (name=testkafka.test_cluster_operations_delay.java_client.single_topic_operation_with_admin_client_test.a.1,
> internal=false, partitions=(partition=0,
> leader=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com),
> replicas=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com),
> qtekafka-alpha-02.dr.local.com:9092 (id: 4 rack: dr.local.com),
> qtekafka-alpha-02.nyc.local.com:9092 (id: 2 rack: nyc.local.com),
> isr=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com),
> qtekafka-alpha-02.dr.local.com:9092 (id: 4 rack: dr.local.com),
> qtekafka-alpha-02.nyc.local.com:9092 (id: 2 rack: nyc.local.com)),
> authorizedOperations=null)
> [20250214 07:11:43.939 EST (main) kafkautils.trycode.TryAdminClient#main
> INFO] Describe operation took = 407 ms{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)