I am using Kafka 0.9.0.1. The first time I start up my application it takes 20-30 seconds to retrieve the "latest" message from the topic
I've used different Kafka brokers (with different configs) yet I still see this behaviour. There is usually no slowness for subsequent messages. Is this expected behaviour? you can clearly see this below by running this sample application (perhaps a few times as it's somewhat intermittent) and changing the broker/topic name to your own settings public class KafkaProducerConsumerTest { public static final String KAFKA_BROKERS = "..."; public static final String TOPIC = "..."; public static void main(String[] args) throws ExecutionException, InterruptedException { new KafkaProducerConsumerTest().run(); } public void run() throws ExecutionException, InterruptedException { Properties consumerProperties = new Properties(); consumerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS); consumerProperties.setProperty("group.id", "Test"); consumerProperties.setProperty("auto.offset.reset", "latest"); consumerProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); MyKafkaConsumer kafkaConsumer = new MyKafkaConsumer(consumerProperties, TOPIC); Executors.newFixedThreadPool(1).submit(() -> kafkaConsumer.consume()); Properties producerProperties = new Properties(); producerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS); producerProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); MyKafkaProducer kafkaProducer = new MyKafkaProducer(producerProperties, TOPIC); kafkaProducer.publish("Test Message"); } } class MyKafkaConsumer { private final Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class); private KafkaConsumer<String, Object> kafkaConsumer; public MyKafkaConsumer(Properties properties, String topic) { kafkaConsumer = new KafkaConsumer<String, Object>(properties); kafkaConsumer.subscribe(Lists.newArrayList(topic)); } public void consume() { while (true) { logger.info("Started listening..."); ConsumerRecords<String, Object> consumerRecords = kafkaConsumer.poll(Long.MAX_VALUE); logger.info("Received records {}", consumerRecords.iterator().next().value()); } } } class MyKafkaProducer { private KafkaProducer<String, Object> kafkaProducer; private String topic; public MyKafkaProducer(Properties properties, String topic) { this.kafkaProducer = new KafkaProducer<String, Object>(properties); this.topic = topic; } public void publish(Object object) throws ExecutionException, InterruptedException { ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, "key", object); Future<RecordMetadata> response = kafkaProducer.send(producerRecord); response.get(); } }