Can you try posting your code into a Gist (gist.github.com) or Pastebin, so
it's formatted and easier to read?

On Tue, Mar 1, 2016 at 8:49 AM, Péricé Robin <perice.ro...@gmail.com> wrote:

> Hello everybody,
>
> I'm having troubles using KafkaConsumer 0.9.0.0 API. My Consumer class
> doesn't consumer messages properly.
>
> -------------------------
> | *Consumer*.java |
> -------------------------
>
> public final void run() { try {
> *consumer.subscribe(Collections.singletonList(topicName));* boolean end =
> false; while (!closed.get()) { while (!end) { *final ConsumerRecords<Long,
> ITupleApi> records = consumer.poll(1000);* if (records == null || records.
> count() == 0) { System.err.println("In ConsumerThread consumer.poll
> received nothing on " + topicName); } else { /* some processing on records
> here */ end = true; } } } } catch (final WakeupException e) { if (!closed.
> get()) { throw e; } } finally { consumer.close(); } }
>
> ------------------------
> | *Producer*.java |
> ------------------------
>
>
> public final void run() {
> while (true) {
> *producer.send(new ProducerRecord<Long, ITupleApi>(topicName, timems,
> tuple), callback);*
> }
> }
>
> ----------------------------------------------
>
> I run this exactly the same way as in Github example :
>
> https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
>
> But I only get "In ConsumerThread consumer.poll received nothing". Poll()
> never send me messages ... But when I use command line tools I can see my
> messages on the topic.
>
> When I run the basic example from GitHub everything works fine ... So it's
> seems like I'm missing something.
>
>
>
>
>
>
> *CONSOLE*
>
> [2016-03-01 14:42:14,280] INFO [GroupCoordinator 0]: Preparing to
> restabilize group KafkaChannelBasicTestConsumer with old generation 1
> (kafka.coordin
> ator.GroupCoordinator)
> [2016-03-01 14:42:14,281] INFO [GroupCoordinator 0]: Group
> KafkaChannelBasicTestConsumer generation 1 is dead and removed
> (kafka.coordinator.GroupCoor
> dinator)
> [2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Preparing to
> restabilize group KafkaChannelBasicTestConsumer with old generation 0
> (kafka.coordin
> ator.GroupCoordinator)
> [2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Stabilized group
> KafkaChannelBasicTestConsumer generation 1
> (kafka.coordinator.GroupCoordinator)
> [2016-03-01 14:42:22,797] INFO [GroupCoordinator 0]: Assignment received
> from leader for group KafkaChannelBasicTestConsumer for generation 1
> (kafka.c
> oordinator.GroupCoordinator)
> [2016-03-01 14:42:55,808] INFO [GroupCoordinator 0]: Preparing to
> restabilize group KafkaChannelBasicTestConsumer with old generation 1
> (kafka.coordin
> ator.GroupCoordinator)
> [2016-03-01 14:42:55,809] INFO [GroupCoordinator 0]: Group
> KafkaChannelBasicTestConsumer generation 1 is dead and removed
> (kafka.coordinator.GroupCoor
> dinator)
>
>
> I really need help on this !
>
> Regards,
>
> Robin
>



-- 

https://github.com/mindscratch
https://www.google.com/+CraigWickesser
https://twitter.com/mind_scratch
https://twitter.com/craig_links

Reply via email to