Yuto Kawamura created KAFKA-4024:
------------------------------------

             Summary: First metadata update always take retry.backoff.ms 
milliseconds to complete
                 Key: KAFKA-4024
                 URL: https://issues.apache.org/jira/browse/KAFKA-4024
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 0.10.0.0, 0.9.0.1
            Reporter: Yuto Kawamura
            Assignee: Yuto Kawamura


Recently I updated our KafkaProducer configuration, specifically we adjusted 
{{retry.backoff.ms}} from default(100ms) to 1000ms.
After that we observed that the first {{send()}} start taking longer than 
before, investigated then found following facts.

Environment:
- Kafka broker 0.9.0.1
- Kafka producer 0.9.0.1

Our current version is 0.9.0.1 but it reproduced with latest build from trunk 
branch as well.

h2. TL;DR
The first {{KafkaProducer.send()}} always blocked {{retry.backoff.ms}} 
milliseconds, due to unintentionally applied backoff on first metadata update.


h2. Proof
I wrote following test code and placed under the clients/main/java/

{code}
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;

public final class KafkaProducerMetadataUpdateDurationTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
        props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000");
        String retryBackoffMs = System.getProperty("retry.backoff.ms");
        System.err.println("Experimenting with retry.backoff.ms = " + 
retryBackoffMs);
        props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
retryBackoffMs);

        Producer<byte[], byte[]> producer =
                new KafkaProducer<>(props, new ByteArraySerializer(), new 
ByteArraySerializer());

        long t0 = System.nanoTime();
        try {
            producer.partitionsFor("test");
            long duration = System.nanoTime() - t0;
            System.err.println("Duration = " + 
TimeUnit.NANOSECONDS.toMillis(duration) + " ms");
        } finally {
            producer.close();
        }
    }
}
{code}

Here's experiment log:
{code}
# Start zookeeper & kafka broker
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties

# Create test topic
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
--replication-factor 1 --partitions 1

$ ./bin/kafka-run-class.sh -Dretry.backoff.ms=100 
KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 100
Duration = 175 ms

$ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1000 
KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 1000
Duration = 1066 ms

$ ./bin/kafka-run-class.sh -Dretry.backoff.ms=10000 
KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 10000
Duration = 10070 ms
{code}

As you can see, duration of {{partitionsFor()}} increases linearly in 
proportion to the value of {{retry.backoff.ms}}.

Here I describe the scenario that leads this behavior:
1. KafkaProducer initializes metadata with giving {{bootstrap.servers}} and the 
current timestamp: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L276
2. On the first {{send()}}, KafkaProducer requests metadata update due to 
missing partition info: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L527
3. But, DefaultMetadataUpdater doesn't actually send MetadataRequest, because 
{{metadata.timeToNextUpdate}} returns a value lager than zero: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L541-L548
4. {{Metadata.timeToNextUpdate}} returns lager one of time till metadata 
expiration or time till backing off expiration but practially needUpdate is 
always true at the first time so here the timeToAllowUpdate is always adopted, 
which never be zero until {{retry.backoff.ms}} elapsed since the first 
{{metadata.update()}}: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L116


This is because of kafka client tries to keep interval configured by 
{{retry.backoff.ms}} between each metadata update so it's basically works fine 
from the second update but for the first time, since it could never have the 
actual metadata(which is obtained by MetadaUpdate request), this backing off 
isn't making sense and in fact it's harming our application by blocking the 
first {{send()}} insanely long.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to