[
https://issues.apache.org/jira/browse/KAFKA-16259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhifeng Chen updated KAFKA-16259:
---------------------------------
Description:
TL;DR, A Kafka client produce latency issue is identified caused by
synchronized lock contention of metadata cache read/write in the native kafka
producer.
Trigger Condition: A producer need to produce to large number of topics. such
as in kafka rest-proxy
What is producer metadata cache
Kafka producer maintains a in-memory copy of cluster metadata, and it avoided
fetch metadata every time when produce message to reduce latency
What’s the synchronized lock contention problem
Kafka producer metadata cache is a *mutable* object, read/write are isolated by
a synchronized lock. Which means when the metadata cache is being updated, all
read requests are blocked.
Topic metadata expiration frequency increase liner with number of topics. In a
kafka cluster with large number of topic partitions, topic metadata expiration
and refresh triggers high frequent metadata update. When read operation blocked
by update, producer threads are blocked and caused high produce latency issue.
*Proposed solution*
TL;DR Optimize performance of metadata cache read operation of native kafka
producer with copy-on-write strategy
What is copy-on-write strategy
It’s a solution to reduce synchronized lock contention by making the object
immutable, and always create a new instance when updating, but since the object
is immutable, read operation will be free from waiting, thus produce latency
reduced significantly
Besides performance, it can also make the metadata cache immutable from
unexpected modification, reduce occurrence of code bugs due to incorrect
synchronization
{*}Test result{*}:
Environment: Kafka-rest-proxy
Client version: 2.8.0
Number of topic partitions: 250k
test result show 90%+ latency reduction on test cluster
!image-2024-02-14-12-11-07-366.png!
P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part
show latency after the improvement, lower part show before improvement)
Thread dump show details of the synchronization contention
Threads acquiring lock
Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [ 0x00007f77d70121a0 ]
...
at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111)
at
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019)
at
org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144)
at
io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39)
at
io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117)
Threads hodl the lock
kafka-producer-network-thread | kafka-rest-proxyrunning , holding [
0x00007f77d70121a0 ]
at
java.util.stream.ReferencePipeline$3$1.accept([email protected]/ReferencePipeline.java:195)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining([email protected]/ArrayList.java:1655)
at
java.util.stream.AbstractPipeline.copyInto([email protected]/AbstractPipeline.java:484)
at
org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(MetadataResponse.java:162)
at
org.apache.kafka.common.requests.MetadataResponse.toPartitionInfo(MetadataResponse.java:152)
at
org.apache.kafka.clients.MetadataCache.lambda$computeClusterView$1(MetadataCache.java:177)
at
org.apache.kafka.clients.MetadataCache$$Lambda$695/0x00007f75da3ddcb0.apply(Unknown
Source)
at
java.util.stream.ReferencePipeline$3$1.accept([email protected]/ReferencePipeline.java:195)
at
org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:178)
at java.lang.Thread.run([email protected]/Thread.java:829)
was:
TL;DR, A Kafka client produce latency issue is identified caused by
synchronized lock contention of metadata cache read/write in the native kafka
producer.
Trigger Condition: A producer need to produce to large number of topics. such
as in kafka rest-proxy
What is producer metadata cache
Kafka producer maintains a in-memory copy of cluster metadata, and it avoided
fetch metadata every time when produce message to reduce latency
What’s the synchronized lock contention problem
Kafka producer metadata cache is a *mutable* object, read/write are isolated by
a synchronized lock. Which means when the metadata cache is being updated, all
read requests are blocked.
Topic metadata expiration frequency increase liner with number of topics. In a
kafka cluster with large number of topic partitions, topic metadata expiration
and refresh triggers high frequent metadata update. When read operation blocked
by update, producer threads are blocked and caused high produce latency issue.
*Proposed solution*
TL;DR Optimize performance of metadata cache read operation of native kafka
producer with copy-on-write strategy
What is copy-on-write strategy
It’s a solution to reduce synchronized lock contention by making the object
immutable, and always create a new instance when updating, but since the object
is immutable, read operation will be free from waiting, thus produce latency
reduced significantly
Besides performance, it can also make the metadata cache immutable from
unexpected modification, reduce occurrence of code bugs due to incorrect
synchronization
{*}Test result{*}:
Environment: Kafka-rest-proxy
Client version: 2.8.0
Number of topic partitions: 250k
test result show 90%+ latency reduction on test cluster
!image-2024-02-14-12-11-07-366.png!
P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part
show latency after the improvement, lower part show before improvement)
> Immutable MetadataCache to improve client performance
> -----------------------------------------------------
>
> Key: KAFKA-16259
> URL: https://issues.apache.org/jira/browse/KAFKA-16259
> Project: Kafka
> Issue Type: Improvement
> Components: clients
> Affects Versions: 2.8.0
> Reporter: Zhifeng Chen
> Priority: Major
> Attachments: image-2024-02-14-12-11-07-366.png
>
>
> TL;DR, A Kafka client produce latency issue is identified caused by
> synchronized lock contention of metadata cache read/write in the native kafka
> producer.
> Trigger Condition: A producer need to produce to large number of topics. such
> as in kafka rest-proxy
>
>
> What is producer metadata cache
> Kafka producer maintains a in-memory copy of cluster metadata, and it avoided
> fetch metadata every time when produce message to reduce latency
>
> What’s the synchronized lock contention problem
> Kafka producer metadata cache is a *mutable* object, read/write are isolated
> by a synchronized lock. Which means when the metadata cache is being updated,
> all read requests are blocked.
> Topic metadata expiration frequency increase liner with number of topics. In
> a kafka cluster with large number of topic partitions, topic metadata
> expiration and refresh triggers high frequent metadata update. When read
> operation blocked by update, producer threads are blocked and caused high
> produce latency issue.
>
> *Proposed solution*
> TL;DR Optimize performance of metadata cache read operation of native kafka
> producer with copy-on-write strategy
> What is copy-on-write strategy
> It’s a solution to reduce synchronized lock contention by making the object
> immutable, and always create a new instance when updating, but since the
> object is immutable, read operation will be free from waiting, thus produce
> latency reduced significantly
> Besides performance, it can also make the metadata cache immutable from
> unexpected modification, reduce occurrence of code bugs due to incorrect
> synchronization
>
> {*}Test result{*}:
> Environment: Kafka-rest-proxy
> Client version: 2.8.0
> Number of topic partitions: 250k
> test result show 90%+ latency reduction on test cluster
> !image-2024-02-14-12-11-07-366.png!
> P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper
> part show latency after the improvement, lower part show before improvement)
> Thread dump show details of the synchronization contention
> Threads acquiring lock
> Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [
> 0x00007f77d70121a0 ]
> Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [
> 0x00007f77d70121a0 ]
> ...
> at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111)
> at
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019)
> at
> org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144)
> at
> io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39)
> at
> io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117)
> Threads hodl the lock
> kafka-producer-network-thread | kafka-rest-proxyrunning , holding [
> 0x00007f77d70121a0 ]
> at
> java.util.stream.ReferencePipeline$3$1.accept([email protected]/ReferencePipeline.java:195)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining([email protected]/ArrayList.java:1655)
> at
> java.util.stream.AbstractPipeline.copyInto([email protected]/AbstractPipeline.java:484)
> at
> org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(MetadataResponse.java:162)
> at
> org.apache.kafka.common.requests.MetadataResponse.toPartitionInfo(MetadataResponse.java:152)
> at
> org.apache.kafka.clients.MetadataCache.lambda$computeClusterView$1(MetadataCache.java:177)
> at
> org.apache.kafka.clients.MetadataCache$$Lambda$695/0x00007f75da3ddcb0.apply(Unknown
> Source)
> at
> java.util.stream.ReferencePipeline$3$1.accept([email protected]/ReferencePipeline.java:195)
> at
> org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:178)
> at java.lang.Thread.run([email protected]/Thread.java:829)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)