This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e026384ffb HOTFIX: Only measure in nano when producer metadata refresh is required (#12102) e026384ffb is described below commit e026384ffb3170a2e71053a4163db58b9bd8fba6 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Wed Apr 27 11:27:54 2022 -0700 HOTFIX: Only measure in nano when producer metadata refresh is required (#12102) We added the metadata wait time in total blocked time (#11805). But we added it in the critical path of send which is called per-record, whereas metadata refresh only happens rarely. This way the cost of time.nanos becomes unnecessarily significant as we call it twice per record. This PR moves the call to inside the waitOnMetadata callee and only when we do need to wait for a metadata refresh round-trip (i.e. we are indeed blocking). Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../main/java/org/apache/kafka/clients/producer/KafkaProducer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f36db02a9d..85a3e239e9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -931,7 +931,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { throwIfProducerClosed(); // first make sure the metadata for the topic is available long nowMs = time.milliseconds(); - long nowNanos = time.nanoseconds(); ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs); @@ -941,7 +940,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { throw e; } nowMs += clusterAndWaitTime.waitedOnMetadataMs; - producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos); long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; @@ -1080,6 +1078,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { // Issue metadata requests until we have metadata for the topic and the requested partition, // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata // is stale and the number of partitions for this topic has increased in the meantime. + long nowNanos = time.nanoseconds(); do { if (partition != null) { log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic); @@ -1111,6 +1110,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null || (partition != null && partition >= partitionsCount)); + producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos); + return new ClusterAndWaitTime(cluster, elapsed); }