[ 
https://issues.apache.org/jira/browse/KAFKA-16259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana reassigned KAFKA-16259:
--------------------------------------

    Assignee: Zhifeng Chen

> Immutable MetadataCache to improve client performance
> -----------------------------------------------------
>
>                 Key: KAFKA-16259
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16259
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 2.8.0
>            Reporter: Zhifeng Chen
>            Assignee: Zhifeng Chen
>            Priority: Major
>         Attachments: image-2024-02-14-12-11-07-366.png
>
>
> TL;DR, A Kafka client produce latency issue is identified caused by 
> synchronized lock contention of metadata cache read/write in the native kafka 
> producer.
> Trigger Condition: A producer need to produce to large number of topics. such 
> as in kafka rest-proxy
>  
>  
> What is producer metadata cache
> Kafka producer maintains a in-memory copy of cluster metadata, and it avoided 
> fetch metadata every time when produce message to reduce latency
>  
> What’s the synchronized lock contention problem
> Kafka producer metadata cache is a *mutable* object, read/write are isolated 
> by a synchronized lock. Which means when the metadata cache is being updated, 
> all read requests are blocked. 
> Topic metadata expiration frequency increase liner with number of topics. In 
> a kafka cluster with large number of topic partitions, topic metadata 
> expiration and refresh triggers high frequent metadata update. When read 
> operation blocked by update, producer threads are blocked and caused high 
> produce latency issue.
>  
> *Proposed solution*
> TL;DR Optimize performance of metadata cache read operation of native kafka 
> producer with copy-on-write strategy
> What is copy-on-write strategy
> It’s a solution to reduce synchronized lock contention by making the object 
> immutable, and always create a new instance when updating, but since the 
> object is immutable, read operation will be free from waiting, thus produce 
> latency reduced significantly
> Besides performance, it can also make the metadata cache immutable from 
> unexpected modification, reduce occurrence of code bugs due to incorrect 
> synchronization 
>  
> {*}Test result{*}:
> Environment: Kafka-rest-proxy
> Client version: 2.8.0
> Number of topic partitions: 250k
> test result show 90%+ latency reduction on test cluster
> !image-2024-02-14-12-11-07-366.png!
> P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper 
> part show latency after the improvement, lower part show before improvement)
> *Dump show details of the problem*
> Threads acquiring lock
>  Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  ...
> at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144)
> at 
> io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39)
> at 
> io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117)
> Threads hold the lock
>  kafka-producer-network-thread | kafka-rest-proxyrunning , holding [ 
> 0x00007f77d70121a0 ]
> at 
> java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java.base@11.0.18/ArrayList.java:1655)
> at 
> java.util.stream.AbstractPipeline.copyInto(java.base@11.0.18/AbstractPipeline.java:484)
> at 
> org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(MetadataResponse.java:162)
> at 
> org.apache.kafka.common.requests.MetadataResponse.toPartitionInfo(MetadataResponse.java:152)
> at 
> org.apache.kafka.clients.MetadataCache.lambda$computeClusterView$1(MetadataCache.java:177)
> at 
> org.apache.kafka.clients.MetadataCache$$Lambda$695/0x00007f75da3ddcb0.apply(Unknown
>  Source)
> at 
> java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
> at 
> org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:178)
> at java.lang.Thread.run(java.base@11.0.18/Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to