[ 
https://issues.apache.org/jira/browse/KAFKA-16259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhifeng Chen updated KAFKA-16259:
---------------------------------
    Description: 
TL;DR, A Kafka client produce latency issue is identified caused by 
synchronized lock contention of metadata cache read/write in the native kafka 
producer.

Trigger Condition: A producer need to produce to large number of topics. such 
as in kafka rest-proxy

 

 

What is producer metadata cache

Kafka producer maintains a in-memory copy of cluster metadata, and it avoided 
fetch metadata every time when produce message to reduce latency

 

What’s the synchronized lock contention problem

Kafka producer metadata cache is a *mutable* object, read/write are isolated by 
a synchronized lock. Which means when the metadata cache is being updated, all 
read requests are blocked. 

Topic metadata expiration frequency increase liner with number of topics. In a 
kafka cluster with large number of topic partitions, topic metadata expiration 
and refresh triggers high frequent metadata update. When read operation blocked 
by update, producer threads are blocked and caused high produce latency issue.

 

*Proposed solution*

TL;DR Optimize performance of metadata cache read operation of native kafka 
producer with copy-on-write strategy

What is copy-on-write strategy

It’s a solution to reduce synchronized lock contention by making the object 
immutable, and always create a new instance when updating, but since the object 
is immutable, read operation will be free from waiting, thus produce latency 
reduced significantly

Besides performance, it can also make the metadata cache immutable from 
unexpected modification, reduce occurrence of code bugs due to incorrect 
synchronization 

 

{*}Test result{*}:

Environment: Kafka-rest-proxy

Client version: 2.8.0

Number of topic partitions: 250k

test result show 90%+ latency reduction on test cluster

!image-2024-02-14-12-11-07-366.png!

P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part 
show latency after the improvement, lower part show before improvement)

Thread dump show details of the synchronization contention

Threads acquiring lock
 Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [ 0x00007f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [ 0x00007f77d70121a0 ]
 ...
at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111)
at 
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019)
at 
org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144)
at 
io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39)
at 
io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117)

Threads hodl the lock
 kafka-producer-network-thread | kafka-rest-proxyrunning , holding [ 
0x00007f77d70121a0 ]
at 
java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java.base@11.0.18/ArrayList.java:1655)
at 
java.util.stream.AbstractPipeline.copyInto(java.base@11.0.18/AbstractPipeline.java:484)
at 
org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(MetadataResponse.java:162)
at 
org.apache.kafka.common.requests.MetadataResponse.toPartitionInfo(MetadataResponse.java:152)
at 
org.apache.kafka.clients.MetadataCache.lambda$computeClusterView$1(MetadataCache.java:177)
at 
org.apache.kafka.clients.MetadataCache$$Lambda$695/0x00007f75da3ddcb0.apply(Unknown
 Source)
at 
java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
at 
org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:178)
at java.lang.Thread.run(java.base@11.0.18/Thread.java:829)

  was:
TL;DR, A Kafka client produce latency issue is identified caused by 
synchronized lock contention of metadata cache read/write in the native kafka 
producer.

Trigger Condition: A producer need to produce to large number of topics. such 
as in kafka rest-proxy

 

 

What is producer metadata cache

Kafka producer maintains a in-memory copy of cluster metadata, and it avoided 
fetch metadata every time when produce message to reduce latency

 

What’s the synchronized lock contention problem

Kafka producer metadata cache is a *mutable* object, read/write are isolated by 
a synchronized lock. Which means when the metadata cache is being updated, all 
read requests are blocked. 

Topic metadata expiration frequency increase liner with number of topics. In a 
kafka cluster with large number of topic partitions, topic metadata expiration 
and refresh triggers high frequent metadata update. When read operation blocked 
by update, producer threads are blocked and caused high produce latency issue.

 

*Proposed solution*

TL;DR Optimize performance of metadata cache read operation of native kafka 
producer with copy-on-write strategy

What is copy-on-write strategy

It’s a solution to reduce synchronized lock contention by making the object 
immutable, and always create a new instance when updating, but since the object 
is immutable, read operation will be free from waiting, thus produce latency 
reduced significantly

Besides performance, it can also make the metadata cache immutable from 
unexpected modification, reduce occurrence of code bugs due to incorrect 
synchronization 

 

{*}Test result{*}:

Environment: Kafka-rest-proxy

Client version: 2.8.0

Number of topic partitions: 250k

test result show 90%+ latency reduction on test cluster

!image-2024-02-14-12-11-07-366.png!

P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part 
show latency after the improvement, lower part show before improvement)


> Immutable MetadataCache to improve client performance
> -----------------------------------------------------
>
>                 Key: KAFKA-16259
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16259
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 2.8.0
>            Reporter: Zhifeng Chen
>            Priority: Major
>         Attachments: image-2024-02-14-12-11-07-366.png
>
>
> TL;DR, A Kafka client produce latency issue is identified caused by 
> synchronized lock contention of metadata cache read/write in the native kafka 
> producer.
> Trigger Condition: A producer need to produce to large number of topics. such 
> as in kafka rest-proxy
>  
>  
> What is producer metadata cache
> Kafka producer maintains a in-memory copy of cluster metadata, and it avoided 
> fetch metadata every time when produce message to reduce latency
>  
> What’s the synchronized lock contention problem
> Kafka producer metadata cache is a *mutable* object, read/write are isolated 
> by a synchronized lock. Which means when the metadata cache is being updated, 
> all read requests are blocked. 
> Topic metadata expiration frequency increase liner with number of topics. In 
> a kafka cluster with large number of topic partitions, topic metadata 
> expiration and refresh triggers high frequent metadata update. When read 
> operation blocked by update, producer threads are blocked and caused high 
> produce latency issue.
>  
> *Proposed solution*
> TL;DR Optimize performance of metadata cache read operation of native kafka 
> producer with copy-on-write strategy
> What is copy-on-write strategy
> It’s a solution to reduce synchronized lock contention by making the object 
> immutable, and always create a new instance when updating, but since the 
> object is immutable, read operation will be free from waiting, thus produce 
> latency reduced significantly
> Besides performance, it can also make the metadata cache immutable from 
> unexpected modification, reduce occurrence of code bugs due to incorrect 
> synchronization 
>  
> {*}Test result{*}:
> Environment: Kafka-rest-proxy
> Client version: 2.8.0
> Number of topic partitions: 250k
> test result show 90%+ latency reduction on test cluster
> !image-2024-02-14-12-11-07-366.png!
> P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper 
> part show latency after the improvement, lower part show before improvement)
> Thread dump show details of the synchronization contention
> Threads acquiring lock
>  Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [ 
> 0x00007f77d70121a0 ]
>  ...
> at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144)
> at 
> io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39)
> at 
> io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117)
> Threads hodl the lock
>  kafka-producer-network-thread | kafka-rest-proxyrunning , holding [ 
> 0x00007f77d70121a0 ]
> at 
> java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java.base@11.0.18/ArrayList.java:1655)
> at 
> java.util.stream.AbstractPipeline.copyInto(java.base@11.0.18/AbstractPipeline.java:484)
> at 
> org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(MetadataResponse.java:162)
> at 
> org.apache.kafka.common.requests.MetadataResponse.toPartitionInfo(MetadataResponse.java:152)
> at 
> org.apache.kafka.clients.MetadataCache.lambda$computeClusterView$1(MetadataCache.java:177)
> at 
> org.apache.kafka.clients.MetadataCache$$Lambda$695/0x00007f75da3ddcb0.apply(Unknown
>  Source)
> at 
> java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
> at 
> org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:178)
> at java.lang.Thread.run(java.base@11.0.18/Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to