Hi all,
I have created 6 Kafka topics each with 2 partitions, and now I want to
consume from all partitions in a Java app.
I have created a Executors.newScheduledThreadPool(12) for this, and then
submit my KafkaConsumer implementations to this thread pool.
In KafkaConsumer, I do:
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> current = it.next();
... process record ...
}
When I run this Java app, everything appears to work at first, but after
about 30 seconds, the threads grind to a halt (I think because it.hasNext()
is blocking indefinitely) ... and then no progress is made.
Messages are constantly being published to Kafka, and there is a lot left
to process, so why would the iterator block indefinitely? (I have to
restart the Java app to consume more messages)
Thanks for any help with this!
Josh