I am attempting to send messages to two topics with a newly created
producer.

The first message sends fine, but for some reason, the producer does not
fetch metadata for the second topic before attempting to send. So sending
to the second topic fails. The producer fetches metadata for the second
topic only after it fails sending to it for the the first time.

Here is my test (Groovy/Java 8):


    // Helper function

    def send(KafkaProducer producer, ProducerRecord record) {
        CompletableFuture<Void> future = new CompletableFuture<>()
        producer.send(record, { meta, e ->
            if (e != null) {
                println "send failed exceptionally ${e.message}"
                future.completeExceptionally(e)
            } else {
                println "send succeeded"
                future.complete(null)
            }
        })
        future
    }


    // The test

    def "test"() {

        Properties config = new Properties();
        config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
'localhost:9092');
        config.put(ProducerConfig.ACKS_CONFIG, 'all');
        config.put(ProducerConfig.RETRIES_CONFIG, 0);

        def producer = new KafkaProducer(config, new ByteArraySerializer(),
new ByteArraySerializer())

        def topic1 = 'topic-1'
        def topic2 = 'topic-2'
        def key1 = 'key-1'.bytes
        def key2 = 'key-2'.bytes
        def message1 = 'some bytes'.bytes
        def message2 = 'more bytes'.bytes

        println "==== SENDING 1 ===="
        try {
            send(producer, new ProducerRecord(topic1, key1, message1))
              .thenCompose({
                println "==== SENDING 2 ===="
                send(producer, new ProducerRecord(topic2, key2, message2))
              })
              .toCompletableFuture().get()
        }
        catch (Exception e) {
            // The above try clause throws. Catch here so that the sleep
below happens
            // During sleep, metadata for `topic-2` is retrieved.
Subsequent attempts
            // to send to `topic-2` will succeed. But the first attempt
fails.
        }

        println 'before sleeping'
        sleep 5000
        println 'after sleeping'


        expect:
        true
    }


Result:

[2017-04-30 19:21:08,977] INFO
(org.apache.kafka.clients.producer.ProducerConfig:180) ProducerConfig
values:
acks = all
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer

[2017-04-30 19:21:09,192] INFO
(org.apache.kafka.common.utils.AppInfoParser:83) Kafka version : 0.10.2.1
[2017-04-30 19:21:09,193] INFO
(org.apache.kafka.common.utils.AppInfoParser:84) Kafka commitId :
e89bffd6b2eff799
==== SENDING 1 ====
[2017-04-30 19:21:09,358] DEBUG
(org.apache.kafka.clients.NetworkClient:767) Initialize connection to node
-1 for sending metadata request
[2017-04-30 19:21:09,358] DEBUG
(org.apache.kafka.clients.NetworkClient:627) Initiating connection to node
-1 at localhost:9092.
[2017-04-30 19:21:09,694] DEBUG
(org.apache.kafka.clients.NetworkClient:590) Completed connection to node
-1.  Fetching API versions.
[2017-04-30 19:21:09,695] DEBUG
(org.apache.kafka.clients.NetworkClient:603) Initiating API versions fetch
from node -1.
[2017-04-30 19:21:09,833] DEBUG
(org.apache.kafka.clients.NetworkClient:558) Recorded API versions for node
-1: (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3],
Offsets(2): 0 to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2],
LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0],
UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable:
1], OffsetCommit(8): 0 to 2 [usable: 2], OffsetFetch(9): 0 to 2 [usable:
2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11): 0 to 1 [usable: 1],
Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0], SyncGroup(14):
0 [usable: 0], DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0
[usable: 0], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable:
0], CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0])
[2017-04-30 19:21:09,834] DEBUG
(org.apache.kafka.clients.NetworkClient:751) Sending metadata request
(type=MetadataRequest, topics=topic-1) to node -1
[2017-04-30 19:21:09,962] DEBUG
(org.apache.kafka.clients.NetworkClient:627) Initiating connection to node
0 at 127.0.0.1:9092.
[2017-04-30 19:21:09,974] DEBUG
(org.apache.kafka.clients.NetworkClient:590) Completed connection to node
0.  Fetching API versions.
[2017-04-30 19:21:09,974] DEBUG
(org.apache.kafka.clients.NetworkClient:603) Initiating API versions fetch
from node 0.
[2017-04-30 19:21:09,977] DEBUG
(org.apache.kafka.clients.NetworkClient:558) Recorded API versions for node
0: (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3],
Offsets(2): 0 to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2],
LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0],
UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable:
1], OffsetCommit(8): 0 to 2 [usable: 2], OffsetFetch(9): 0 to 2 [usable:
2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11): 0 to 1 [usable: 1],
Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0], SyncGroup(14):
0 [usable: 0], DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0
[usable: 0], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable:
0], CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0])
send succeeded
==== SENDING 2 ====
send failed exceptionally Failed to update metadata after 5000 ms.
before sleeping
[2017-04-30 19:21:15,026] DEBUG
(org.apache.kafka.clients.NetworkClient:751) Sending metadata request
(type=MetadataRequest, topics=topic-1,topic-2) to node 0
after sleeping

Reply via email to