Hi All,

I was using the new Kafka Consumer to fetch messages in this way:

while (true) {
    ConsumerRecords<Object, T> records = kafkaConsumer.poll(Long.MAX_VALUE);
    // do nothing if records are empty
    ....
}

Then I realized that blocking until new messages fetched might be a little
overhead. So I looked into the KafkaConsumer code to figure out get a
reasonable timeout.

do {
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
pollOnce(remaining);
    if (!records.isEmpty()) {
        // if data is available, then return it, but first send off the
        // next round of fetches to enable pipelining while the user is
        // handling the fetched records.
        fetcher.initFetches(metadata.fetch());
        client.poll(0);
        return new ConsumerRecords<>(records);
    }

    long elapsed = time.milliseconds() - start;
    remaining = timeout - elapsed;
} while (remaining > 0);

It seems that even if I set a much lower timeout, like 1000ms, my code will
still keep fetching messages, as I use while(true) and the code won't do
anything with an empty message set. So the only difference between a high
timeout and a low one is that the code is looping in the while loop I wrote
or the one in poll(). But in terms of connections to Kafka, setting a low
or high timeout won't affect much in my case.

I might misunderstand the code completely. Anyone is able to shed some
light on this topic?

Thanks.

-- 
Yifan

Reply via email to