I am using Kafka 0.9.0.1.

The first time I start up my application it takes 20-30 seconds to retrieve the 
"latest" message from the topic

I've used different Kafka brokers (with different configs) yet I still see this 
behaviour. There is usually no slowness for subsequent messages.

Is this expected behaviour? you can clearly see this below by running this 
sample application (perhaps a few times as it's somewhat intermittent) and 
changing the broker/topic name to your own settings

public class KafkaProducerConsumerTest {

    public static final String KAFKA_BROKERS = "...";
    public static final String TOPIC = "...";

    public static void main(String[] args) throws ExecutionException, 
InterruptedException {
        new KafkaProducerConsumerTest().run();
    }

    public void run() throws ExecutionException, InterruptedException {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS);
        consumerProperties.setProperty("group.id", "Test");
        consumerProperties.setProperty("auto.offset.reset", "latest");
        consumerProperties.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        consumerProperties.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        MyKafkaConsumer kafkaConsumer = new MyKafkaConsumer(consumerProperties, 
TOPIC);
        Executors.newFixedThreadPool(1).submit(() -> kafkaConsumer.consume());

        Properties producerProperties = new Properties();
        producerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS);
        producerProperties.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
        producerProperties.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");

        MyKafkaProducer kafkaProducer = new MyKafkaProducer(producerProperties, 
TOPIC);
        kafkaProducer.publish("Test Message");
    }
}


class MyKafkaConsumer {
    private final Logger logger = 
LoggerFactory.getLogger(MyKafkaConsumer.class);
    private KafkaConsumer<String, Object> kafkaConsumer;

    public MyKafkaConsumer(Properties properties, String topic) {
        kafkaConsumer = new KafkaConsumer<String, Object>(properties);
        kafkaConsumer.subscribe(Lists.newArrayList(topic));
    }

    public void consume() {
        while (true) {
            logger.info("Started listening...");
            ConsumerRecords<String, Object> consumerRecords = 
kafkaConsumer.poll(Long.MAX_VALUE);
            logger.info("Received records {}", 
consumerRecords.iterator().next().value());
        }
    }
}

class MyKafkaProducer {
    private KafkaProducer<String, Object> kafkaProducer;
    private String topic;

    public MyKafkaProducer(Properties properties, String topic) {
        this.kafkaProducer = new KafkaProducer<String, Object>(properties);
        this.topic = topic;
    }

    public void publish(Object object) throws ExecutionException, 
InterruptedException {
        ProducerRecord<String, Object> producerRecord = new 
ProducerRecord<>(topic, "key", object);
        Future<RecordMetadata> response = kafkaProducer.send(producerRecord);
        response.get();
    }

}

Reply via email to