Can you try posting your code into a Gist (gist.github.com) or Pastebin, so it's formatted and easier to read?
On Tue, Mar 1, 2016 at 8:49 AM, Péricé Robin <perice.ro...@gmail.com> wrote: > Hello everybody, > > I'm having troubles using KafkaConsumer 0.9.0.0 API. My Consumer class > doesn't consumer messages properly. > > ------------------------- > | *Consumer*.java | > ------------------------- > > public final void run() { try { > *consumer.subscribe(Collections.singletonList(topicName));* boolean end = > false; while (!closed.get()) { while (!end) { *final ConsumerRecords<Long, > ITupleApi> records = consumer.poll(1000);* if (records == null || records. > count() == 0) { System.err.println("In ConsumerThread consumer.poll > received nothing on " + topicName); } else { /* some processing on records > here */ end = true; } } } } catch (final WakeupException e) { if (!closed. > get()) { throw e; } } finally { consumer.close(); } } > > ------------------------ > | *Producer*.java | > ------------------------ > > > public final void run() { > while (true) { > *producer.send(new ProducerRecord<Long, ITupleApi>(topicName, timems, > tuple), callback);* > } > } > > ---------------------------------------------- > > I run this exactly the same way as in Github example : > > https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java > > But I only get "In ConsumerThread consumer.poll received nothing". Poll() > never send me messages ... But when I use command line tools I can see my > messages on the topic. > > When I run the basic example from GitHub everything works fine ... So it's > seems like I'm missing something. > > > > > > > *CONSOLE* > > [2016-03-01 14:42:14,280] INFO [GroupCoordinator 0]: Preparing to > restabilize group KafkaChannelBasicTestConsumer with old generation 1 > (kafka.coordin > ator.GroupCoordinator) > [2016-03-01 14:42:14,281] INFO [GroupCoordinator 0]: Group > KafkaChannelBasicTestConsumer generation 1 is dead and removed > (kafka.coordinator.GroupCoor > dinator) > [2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Preparing to > restabilize group KafkaChannelBasicTestConsumer with old generation 0 > (kafka.coordin > ator.GroupCoordinator) > [2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Stabilized group > KafkaChannelBasicTestConsumer generation 1 > (kafka.coordinator.GroupCoordinator) > [2016-03-01 14:42:22,797] INFO [GroupCoordinator 0]: Assignment received > from leader for group KafkaChannelBasicTestConsumer for generation 1 > (kafka.c > oordinator.GroupCoordinator) > [2016-03-01 14:42:55,808] INFO [GroupCoordinator 0]: Preparing to > restabilize group KafkaChannelBasicTestConsumer with old generation 1 > (kafka.coordin > ator.GroupCoordinator) > [2016-03-01 14:42:55,809] INFO [GroupCoordinator 0]: Group > KafkaChannelBasicTestConsumer generation 1 is dead and removed > (kafka.coordinator.GroupCoor > dinator) > > > I really need help on this ! > > Regards, > > Robin > -- https://github.com/mindscratch https://www.google.com/+CraigWickesser https://twitter.com/mind_scratch https://twitter.com/craig_links