Hello everybody,

I'm having troubles using KafkaConsumer 0.9.0.0 API. My Consumer class
doesn't consumer messages properly.

-------------------------
| *Consumer*.java |
-------------------------

public final void run() { try {
*consumer.subscribe(Collections.singletonList(topicName));* boolean end =
false; while (!closed.get()) { while (!end) { *final ConsumerRecords<Long,
ITupleApi> records = consumer.poll(1000);* if (records == null || records.
count() == 0) { System.err.println("In ConsumerThread consumer.poll
received nothing on " + topicName); } else { /* some processing on records
here */ end = true; } } } } catch (final WakeupException e) { if (!closed.
get()) { throw e; } } finally { consumer.close(); } }

------------------------
| *Producer*.java |
------------------------


public final void run() {
while (true) {
*producer.send(new ProducerRecord<Long, ITupleApi>(topicName, timems,
tuple), callback);*
}
}

----------------------------------------------

I run this exactly the same way as in Github example :
https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java

But I only get "In ConsumerThread consumer.poll received nothing". Poll()
never send me messages ... But when I use command line tools I can see my
messages on the topic.

When I run the basic example from GitHub everything works fine ... So it's
seems like I'm missing something.






*CONSOLE*

[2016-03-01 14:42:14,280] INFO [GroupCoordinator 0]: Preparing to
restabilize group KafkaChannelBasicTestConsumer with old generation 1
(kafka.coordin
ator.GroupCoordinator)
[2016-03-01 14:42:14,281] INFO [GroupCoordinator 0]: Group
KafkaChannelBasicTestConsumer generation 1 is dead and removed
(kafka.coordinator.GroupCoor
dinator)
[2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Preparing to
restabilize group KafkaChannelBasicTestConsumer with old generation 0
(kafka.coordin
ator.GroupCoordinator)
[2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Stabilized group
KafkaChannelBasicTestConsumer generation 1
(kafka.coordinator.GroupCoordinator)
[2016-03-01 14:42:22,797] INFO [GroupCoordinator 0]: Assignment received
from leader for group KafkaChannelBasicTestConsumer for generation 1
(kafka.c
oordinator.GroupCoordinator)
[2016-03-01 14:42:55,808] INFO [GroupCoordinator 0]: Preparing to
restabilize group KafkaChannelBasicTestConsumer with old generation 1
(kafka.coordin
ator.GroupCoordinator)
[2016-03-01 14:42:55,809] INFO [GroupCoordinator 0]: Group
KafkaChannelBasicTestConsumer generation 1 is dead and removed
(kafka.coordinator.GroupCoor
dinator)


I really need help on this !

Regards,

Robin

Reply via email to