[ 
https://issues.apache.org/jira/browse/KAFKA-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458639#comment-15458639
 ] 

Yuto Kawamura edited comment on KAFKA-4024 at 9/7/16 1:46 AM:
--------------------------------------------------------------

I reconsidered this issue and think I found that this is much worse than I 
explained before.

IIUC, in short, setting {{retry.backoff.ms}} to lager value can delays 
KafkaProducer to update outdated metadata.
That is, when we set {{retry.backoff.ms}} to 1 second for example, and a 
partition leadership failover happens, the producer will take 1 seconds to fire 
metadata request in the worst case, even though it could detect broker 
disconnection or outdated partition leadership information.

Here's the result of my experiment. I modified 
{{KafkaProducerMetadataUpdateDurationTest}} and observed DEBUG logs of 
NetworkClient and Metadata.

clients/src/main/java/KafkaProducerMetadataUpdateDurationTest.java:
{code}
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public final class KafkaProducerMetadataUpdateDurationTest {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"HOST-1:9092,HOST-2:9092,HOST-3:9092");
        props.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000");
        props.setProperty(ProducerConfig.RETRIES_CONFIG, 
String.valueOf(Integer.MAX_VALUE));
        String retryBackoffMs = System.getProperty("retry.backoff.ms");
        System.err.println("Experimenting with retry.backoff.ms = " + 
retryBackoffMs);
        props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
retryBackoffMs);

        Producer<String, String> producer =
                new KafkaProducer<>(props, new StringSerializer(), new 
StringSerializer());

        try {
            int i = 0;
            while (true) {
                final int produceSeq = i++;
                final long t0 = System.nanoTime();
                producer.send(new ProducerRecord<>("test", produceSeq % 3, 
"key", "value"),
                              new Callback() {
                                  @Override
                                  public void onCompletion(RecordMetadata 
metadata, Exception exception) {
                                      long produceDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
                                      System.err.printf("Produce[%d]: 
duration=%d, exception=%s\n", produceSeq, produceDuration, exception);
                                  }
                              });
                long sendDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
                System.err.printf("Send[%d]: duration=%d\n", produceSeq, 
sendDuration);
                Thread.sleep(1000);
            }
        } finally {
            producer.close();
        }
    }
}
{code}

log4j.properties:
{code}
log4j.rootLogger=INFO, stdout

log4j.logger.org.apache.kafka.clients.Metadata=DEBUG, stdout
log4j.additivity.org.apache.kafka.clients.Metadata=false
log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG, stdout
log4j.additivity.org.apache.kafka.clients.NetworkClient=false
log4j.logger.org.apache.kafka.clients.producer.internals.Sender=DEBUG, stdout
log4j.additivity.org.apache.kafka.clients.producer.internals.Sender=DEBUG, 
stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
{code}

Topic "test" has 3 replicas and 3 partitions.
Then I started KafkaProducerMetadataUpdateDurationTest, and stopped broker 1 
manually at (*2). Here's the log:

{code}
./bin/kafka-run-class.sh -Dlog4j.configuration=file:./log4j.properties 
-Dretry.backoff.ms=10000 KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 10000
...
[2016-09-02 22:36:29,839] INFO Kafka version : 0.10.1.0-SNAPSHOT 
(org.apache.kafka.common.utils.AppInfoParser)
[2016-09-02 22:36:29,839] INFO Kafka commitId : 8f3462552fa4d6a6 
(org.apache.kafka.common.utils.AppInfoParser)
[2016-09-02 22:36:39,826] DEBUG Initialize connection to node -2 for sending 
metadata request (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,826] DEBUG Initiating connection to node -2 at 
HOST-2:9092. (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,883] DEBUG Completed connection to node -2 
(org.apache.kafka.clients.NetworkClient)

# *1 The first metadata request
[2016-09-02 22:36:39,902] DEBUG Sending metadata request {topics=[test]} to 
node -2 (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,929] DEBUG Updated cluster metadata version 2 to 
Cluster(nodes = [HOST-2:9092 (id: 2 rack: null), HOST-1:9092 (id: 1 rack: 
null), HOST-3:9092 (id: 3 rack: null)], partitions = [Partition(topic = test, 
partition = 1, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]), 
Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr = 
[3,2,1,]), Partition(topic = test, partition = 2, leader = 2, replicas = 
[1,2,3,], isr = [3,2,1,])]) (org.apache.kafka.clients.Metadata)
Send[0]: duration=10104
[2016-09-02 22:36:39,944] DEBUG Initiating connection to node 3 at HOST-3:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,947] DEBUG Completed connection to node 3 
(org.apache.kafka.clients.NetworkClient)
Produce[0]: duration=10117, exception=null
Send[1]: duration=0
[2016-09-02 22:36:40,950] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:40,952] DEBUG Completed connection to node 1 
(org.apache.kafka.clients.NetworkClient)
Produce[1]: duration=12, exception=null
Send[2]: duration=0
[2016-09-02 22:36:41,955] DEBUG Initiating connection to node 2 at HOST-2:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:41,958] DEBUG Completed connection to node 2 
(org.apache.kafka.clients.NetworkClient)
Produce[2]: duration=5, exception=null
Send[3]: duration=0
Produce[3]: duration=4, exception=null

# *2 I stopped broker 1 at this moment

[2016-09-02 22:36:43,134] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)
Send[4]: duration=0
[2016-09-02 22:36:44,137] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:44,139] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)
Send[5]: duration=0
Produce[5]: duration=4, exception=null
[2016-09-02 22:36:45,141] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:45,143] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)
Send[6]: duration=0
Produce[6]: duration=3, exception=null
[2016-09-02 22:36:46,148] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:46,150] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)
Send[7]: duration=0
[2016-09-02 22:36:47,154] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:47,156] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)
Send[8]: duration=0
Produce[8]: duration=5, exception=null
[2016-09-02 22:36:48,159] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:48,161] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)
Send[9]: duration=0
Produce[9]: duration=3, exception=null
[2016-09-02 22:36:49,165] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:49,168] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)

# *3 The second metadata update exactly after 10 seconds since the first update.
[2016-09-02 22:36:49,914] DEBUG Sending metadata request {topics=[test]} to 
node 3 (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:49,918] DEBUG Updated cluster metadata version 3 to 
Cluster(nodes = [HOST-2:9092 (id: 2 rack: null), HOST-3:9092 (id: 3 rack: 
null)], partitions = [Partition(topic = test, partition = 1, leader = 2, 
replicas = [1,2,3,], isr = [2,3,]), Partition(topic = test, partition = 0, 
leader = 3, replicas = [1,2,3,], isr = [3,2,]), Partition(topic = test, 
partition = 2, leader = 2, replicas = [1,2,3,], isr = [3,2,])]) 
(org.apache.kafka.clients.Metadata)
Produce[4]: duration=5957, exception=null
Produce[7]: duration=2946, exception=null
Send[10]: duration=0
Produce[10]: duration=4, exception=null
{code}

First, as I explained already, the first send() blocked insanely long due to 
not intentionally applied refreshBackoffMs (*1).
Then I stopped broker 1 at (*2). I think what we expect here is that 
KafkaProducer immediately tries to update metadata in order to failover 
producing target to the new leader, but it doesn't until 10 
seconds(=retry.backoff.ms) elapsed since the first update at (*3).

This leads following bad effects:
- Producing latency
- Buffer full due to accumulated records
- Batch expiration by elapsing {{request.timeout.ms}} : 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java#L153-L156



was (Author: kawamuray):
I reconsidered this issue and think I found that this is much worse than I 
explained before.

IIUC, in short, setting {{retry.backoff.ms}} to lager value can delays 
KafkaProducer to update outdated metadata.
That is, when we set {{retry.backoff.ms}} to 1 second for example, and a 
partition leadership failover happens, the producer will take 1 seconds to fire 
metadata request in the worst case, even though it could detect broker 
disconnection or outdated partition leadership information.

Here's the result of my experiment. I modified 
{{KafkaProducerMetadataUpdateDurationTest}} and observed DEBUG logs of 
NetworkClient and Metadata.

clients/src/main/java/org/apache/kafka/clients/Metadata.java:
{code}
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public final class KafkaProducerMetadataUpdateDurationTest {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"HOST-1:9092,HOST-2:9092,HOST-3:9092");
        props.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000");
        props.setProperty(ProducerConfig.RETRIES_CONFIG, 
String.valueOf(Integer.MAX_VALUE));
        String retryBackoffMs = System.getProperty("retry.backoff.ms");
        System.err.println("Experimenting with retry.backoff.ms = " + 
retryBackoffMs);
        props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
retryBackoffMs);

        Producer<String, String> producer =
                new KafkaProducer<>(props, new StringSerializer(), new 
StringSerializer());

        try {
            int i = 0;
            while (true) {
                final int produceSeq = i++;
                final long t0 = System.nanoTime();
                producer.send(new ProducerRecord<>("test", produceSeq % 3, 
"key", "value"),
                              new Callback() {
                                  @Override
                                  public void onCompletion(RecordMetadata 
metadata, Exception exception) {
                                      long produceDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
                                      System.err.printf("Produce[%d]: 
duration=%d, exception=%s\n", produceSeq, produceDuration, exception);
                                  }
                              });
                long sendDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
                System.err.printf("Send[%d]: duration=%d\n", produceSeq, 
sendDuration);
                Thread.sleep(1000);
            }
        } finally {
            producer.close();
        }
    }
}
{code}

log4j.properties:
{code}
log4j.rootLogger=INFO, stdout

log4j.logger.org.apache.kafka.clients.Metadata=DEBUG, stdout
log4j.additivity.org.apache.kafka.clients.Metadata=false
log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG, stdout
log4j.additivity.org.apache.kafka.clients.NetworkClient=false
log4j.logger.org.apache.kafka.clients.producer.internals.Sender=DEBUG, stdout
log4j.additivity.org.apache.kafka.clients.producer.internals.Sender=DEBUG, 
stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
{code}

Topic "test" has 3 replicas and 3 partitions.
Then I started KafkaProducerMetadataUpdateDurationTest, and stopped broker 1 
manually at (*2). Here's the log:

{code}
./bin/kafka-run-class.sh -Dlog4j.configuration=file:./log4j.properties 
-Dretry.backoff.ms=10000 KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 10000
...
[2016-09-02 22:36:29,839] INFO Kafka version : 0.10.1.0-SNAPSHOT 
(org.apache.kafka.common.utils.AppInfoParser)
[2016-09-02 22:36:29,839] INFO Kafka commitId : 8f3462552fa4d6a6 
(org.apache.kafka.common.utils.AppInfoParser)
[2016-09-02 22:36:39,826] DEBUG Initialize connection to node -2 for sending 
metadata request (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,826] DEBUG Initiating connection to node -2 at 
HOST-2:9092. (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,883] DEBUG Completed connection to node -2 
(org.apache.kafka.clients.NetworkClient)

# *1 The first metadata request
[2016-09-02 22:36:39,902] DEBUG Sending metadata request {topics=[test]} to 
node -2 (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,929] DEBUG Updated cluster metadata version 2 to 
Cluster(nodes = [HOST-2:9092 (id: 2 rack: null), HOST-1:9092 (id: 1 rack: 
null), HOST-3:9092 (id: 3 rack: null)], partitions = [Partition(topic = test, 
partition = 1, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]), 
Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr = 
[3,2,1,]), Partition(topic = test, partition = 2, leader = 2, replicas = 
[1,2,3,], isr = [3,2,1,])]) (org.apache.kafka.clients.Metadata)
Send[0]: duration=10104
[2016-09-02 22:36:39,944] DEBUG Initiating connection to node 3 at HOST-3:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,947] DEBUG Completed connection to node 3 
(org.apache.kafka.clients.NetworkClient)
Produce[0]: duration=10117, exception=null
Send[1]: duration=0
[2016-09-02 22:36:40,950] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:40,952] DEBUG Completed connection to node 1 
(org.apache.kafka.clients.NetworkClient)
Produce[1]: duration=12, exception=null
Send[2]: duration=0
[2016-09-02 22:36:41,955] DEBUG Initiating connection to node 2 at HOST-2:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:41,958] DEBUG Completed connection to node 2 
(org.apache.kafka.clients.NetworkClient)
Produce[2]: duration=5, exception=null
Send[3]: duration=0
Produce[3]: duration=4, exception=null

# *2 I stopped broker 1 at this moment

[2016-09-02 22:36:43,134] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)
Send[4]: duration=0
[2016-09-02 22:36:44,137] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:44,139] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)
Send[5]: duration=0
Produce[5]: duration=4, exception=null
[2016-09-02 22:36:45,141] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:45,143] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)
Send[6]: duration=0
Produce[6]: duration=3, exception=null
[2016-09-02 22:36:46,148] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:46,150] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)
Send[7]: duration=0
[2016-09-02 22:36:47,154] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:47,156] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)
Send[8]: duration=0
Produce[8]: duration=5, exception=null
[2016-09-02 22:36:48,159] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:48,161] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)
Send[9]: duration=0
Produce[9]: duration=3, exception=null
[2016-09-02 22:36:49,165] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:49,168] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)

# *3 The second metadata update exactly after 10 seconds since the first update.
[2016-09-02 22:36:49,914] DEBUG Sending metadata request {topics=[test]} to 
node 3 (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:49,918] DEBUG Updated cluster metadata version 3 to 
Cluster(nodes = [HOST-2:9092 (id: 2 rack: null), HOST-3:9092 (id: 3 rack: 
null)], partitions = [Partition(topic = test, partition = 1, leader = 2, 
replicas = [1,2,3,], isr = [2,3,]), Partition(topic = test, partition = 0, 
leader = 3, replicas = [1,2,3,], isr = [3,2,]), Partition(topic = test, 
partition = 2, leader = 2, replicas = [1,2,3,], isr = [3,2,])]) 
(org.apache.kafka.clients.Metadata)
Produce[4]: duration=5957, exception=null
Produce[7]: duration=2946, exception=null
Send[10]: duration=0
Produce[10]: duration=4, exception=null
{code}

First, as I explained already, the first send() blocked insanely long due to 
not intentionally applied refreshBackoffMs (*1).
Then I stopped broker 1 at (*2). I think what we expect here is that 
KafkaProducer immediately tries to update metadata in order to failover 
producing target to the new leader, but it doesn't until 10 
seconds(=retry.backoff.ms) elapsed since the first update at (*3).

This leads following bad effects:
- Producing latency
- Buffer full due to accumulated records
- Batch expiration by elapsing {{request.timeout.ms}} : 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java#L153-L156


> First metadata update always take retry.backoff.ms milliseconds to complete
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-4024
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4024
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1, 0.10.0.0
>            Reporter: Yuto Kawamura
>            Assignee: Yuto Kawamura
>
> Recently I updated our KafkaProducer configuration, specifically we adjusted 
> {{retry.backoff.ms}} from default(100ms) to 1000ms.
> After that we observed that the first {{send()}} start taking longer than 
> before, investigated then found following facts.
> Environment:
> - Kafka broker 0.9.0.1
> - Kafka producer 0.9.0.1
> Our current version is 0.9.0.1 but it reproduced with latest build from trunk 
> branch as well.
> h2. TL;DR
> The first {{KafkaProducer.send()}} always blocked {{retry.backoff.ms}} 
> milliseconds, due to unintentionally applied backoff on first metadata update.
> h2. Proof
> I wrote following test code and placed under the clients/main/java/
> {code}
> import java.util.Properties;
> import java.util.concurrent.TimeUnit;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public final class KafkaProducerMetadataUpdateDurationTest {
>     public static void main(String[] args) {
>         Properties props = new Properties();
>         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
>         props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000");
>         String retryBackoffMs = System.getProperty("retry.backoff.ms");
>         System.err.println("Experimenting with retry.backoff.ms = " + 
> retryBackoffMs);
>         props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
> retryBackoffMs);
>         Producer<byte[], byte[]> producer =
>                 new KafkaProducer<>(props, new ByteArraySerializer(), new 
> ByteArraySerializer());
>         long t0 = System.nanoTime();
>         try {
>             producer.partitionsFor("test");
>             long duration = System.nanoTime() - t0;
>             System.err.println("Duration = " + 
> TimeUnit.NANOSECONDS.toMillis(duration) + " ms");
>         } finally {
>             producer.close();
>         }
>     }
> }
> {code}
> Here's experiment log:
> {code}
> # Start zookeeper & kafka broker
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> ./bin/kafka-server-start.sh config/server.properties
> # Create test topic
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --replication-factor 1 --partitions 1
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=100 
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 100
> Duration = 175 ms
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1000 
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 1000
> Duration = 1066 ms
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=10000 
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 10000
> Duration = 10070 ms
> {code}
> As you can see, duration of {{partitionsFor()}} increases linearly in 
> proportion to the value of {{retry.backoff.ms}}.
> Here I describe the scenario that leads this behavior:
> 1. KafkaProducer initializes metadata with giving {{bootstrap.servers}} and 
> the current timestamp: 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L276
> 2. On the first {{send()}}, KafkaProducer requests metadata update due to 
> missing partition info: 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L527
> 3. But, DefaultMetadataUpdater doesn't actually send MetadataRequest, because 
> {{metadata.timeToNextUpdate}} returns a value lager than zero: 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L541-L548
> 4. {{Metadata.timeToNextUpdate}} returns lager one of time till metadata 
> expiration or time till backing off expiration but practially needUpdate is 
> always true at the first time so here the timeToAllowUpdate is always 
> adopted, which never be zero until {{retry.backoff.ms}} elapsed since the 
> first {{metadata.update()}}: 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L116
> This is because of kafka client tries to keep interval configured by 
> {{retry.backoff.ms}} between each metadata update so it's basically works 
> fine from the second update but for the first time, since it could never have 
> the actual metadata(which is obtained by MetadaUpdate request), this backing 
> off isn't making sense and in fact it's harming our application by blocking 
> the first {{send()}} insanely long.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to