I am attempting to send messages to two topics with a newly created producer.
The first message sends fine, but for some reason, the producer does not fetch metadata for the second topic before attempting to send. So sending to the second topic fails. The producer fetches metadata for the second topic only after it fails sending to it for the the first time. Here is my test (Groovy/Java 8): // Helper function def send(KafkaProducer producer, ProducerRecord record) { CompletableFuture<Void> future = new CompletableFuture<>() producer.send(record, { meta, e -> if (e != null) { println "send failed exceptionally ${e.message}" future.completeExceptionally(e) } else { println "send succeeded" future.complete(null) } }) future } // The test def "test"() { Properties config = new Properties(); config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092'); config.put(ProducerConfig.ACKS_CONFIG, 'all'); config.put(ProducerConfig.RETRIES_CONFIG, 0); def producer = new KafkaProducer(config, new ByteArraySerializer(), new ByteArraySerializer()) def topic1 = 'topic-1' def topic2 = 'topic-2' def key1 = 'key-1'.bytes def key2 = 'key-2'.bytes def message1 = 'some bytes'.bytes def message2 = 'more bytes'.bytes println "==== SENDING 1 ====" try { send(producer, new ProducerRecord(topic1, key1, message1)) .thenCompose({ println "==== SENDING 2 ====" send(producer, new ProducerRecord(topic2, key2, message2)) }) .toCompletableFuture().get() } catch (Exception e) { // The above try clause throws. Catch here so that the sleep below happens // During sleep, metadata for `topic-2` is retrieved. Subsequent attempts // to send to `topic-2` will succeed. But the first attempt fails. } println 'before sleeping' sleep 5000 println 'after sleeping' expect: true } Result: [2017-04-30 19:21:08,977] INFO (org.apache.kafka.clients.producer.ProducerConfig:180) ProducerConfig values: acks = all batch.size = 16384 block.on.buffer.full = false bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 5000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.fetch.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS timeout.ms = 30000 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer [2017-04-30 19:21:09,192] INFO (org.apache.kafka.common.utils.AppInfoParser:83) Kafka version : 0.10.2.1 [2017-04-30 19:21:09,193] INFO (org.apache.kafka.common.utils.AppInfoParser:84) Kafka commitId : e89bffd6b2eff799 ==== SENDING 1 ==== [2017-04-30 19:21:09,358] DEBUG (org.apache.kafka.clients.NetworkClient:767) Initialize connection to node -1 for sending metadata request [2017-04-30 19:21:09,358] DEBUG (org.apache.kafka.clients.NetworkClient:627) Initiating connection to node -1 at localhost:9092. [2017-04-30 19:21:09,694] DEBUG (org.apache.kafka.clients.NetworkClient:590) Completed connection to node -1. Fetching API versions. [2017-04-30 19:21:09,695] DEBUG (org.apache.kafka.clients.NetworkClient:603) Initiating API versions fetch from node -1. [2017-04-30 19:21:09,833] DEBUG (org.apache.kafka.clients.NetworkClient:558) Recorded API versions for node -1: (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) [2017-04-30 19:21:09,834] DEBUG (org.apache.kafka.clients.NetworkClient:751) Sending metadata request (type=MetadataRequest, topics=topic-1) to node -1 [2017-04-30 19:21:09,962] DEBUG (org.apache.kafka.clients.NetworkClient:627) Initiating connection to node 0 at 127.0.0.1:9092. [2017-04-30 19:21:09,974] DEBUG (org.apache.kafka.clients.NetworkClient:590) Completed connection to node 0. Fetching API versions. [2017-04-30 19:21:09,974] DEBUG (org.apache.kafka.clients.NetworkClient:603) Initiating API versions fetch from node 0. [2017-04-30 19:21:09,977] DEBUG (org.apache.kafka.clients.NetworkClient:558) Recorded API versions for node 0: (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) send succeeded ==== SENDING 2 ==== send failed exceptionally Failed to update metadata after 5000 ms. before sleeping [2017-04-30 19:21:15,026] DEBUG (org.apache.kafka.clients.NetworkClient:751) Sending metadata request (type=MetadataRequest, topics=topic-1,topic-2) to node 0 after sleeping