Hi Robin, Sorry for the late reply. I'm a little puzzled with your consumer code. Once the "end" flag is set to true, you won't ever hit the poll() call again, or am I missing something? Do you even need that inner loop?
-Jason On Tue, Mar 1, 2016 at 6:06 AM, Péricé Robin <perice.ro...@gmail.com> wrote: > Producer : https://gist.github.com/r0perice/9ce2bece76dd4113a44a > Consumer : https://gist.github.com/r0perice/8dcee160017ccd779d59 > Console : https://gist.github.com/r0perice/5a8e2b2939651b1ac893 > > 2016-03-01 14:50 GMT+01:00 craig w <codecr...@gmail.com>: > > > Can you try posting your code into a Gist (gist.github.com) or Pastebin, > > so > > it's formatted and easier to read? > > > > On Tue, Mar 1, 2016 at 8:49 AM, Péricé Robin <perice.ro...@gmail.com> > > wrote: > > > > > Hello everybody, > > > > > > I'm having troubles using KafkaConsumer 0.9.0.0 API. My Consumer class > > > doesn't consumer messages properly. > > > > > > ------------------------- > > > | *Consumer*.java | > > > ------------------------- > > > > > > public final void run() { try { > > > *consumer.subscribe(Collections.singletonList(topicName));* boolean > end = > > > false; while (!closed.get()) { while (!end) { *final > > ConsumerRecords<Long, > > > ITupleApi> records = consumer.poll(1000);* if (records == null || > > records. > > > count() == 0) { System.err.println("In ConsumerThread consumer.poll > > > received nothing on " + topicName); } else { /* some processing on > > records > > > here */ end = true; } } } } catch (final WakeupException e) { if > > (!closed. > > > get()) { throw e; } } finally { consumer.close(); } } > > > > > > ------------------------ > > > | *Producer*.java | > > > ------------------------ > > > > > > > > > public final void run() { > > > while (true) { > > > *producer.send(new ProducerRecord<Long, ITupleApi>(topicName, timems, > > > tuple), callback);* > > > } > > > } > > > > > > ---------------------------------------------- > > > > > > I run this exactly the same way as in Github example : > > > > > > > > > https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java > > > > > > But I only get "In ConsumerThread consumer.poll received nothing". > Poll() > > > never send me messages ... But when I use command line tools I can see > my > > > messages on the topic. > > > > > > When I run the basic example from GitHub everything works fine ... So > > it's > > > seems like I'm missing something. > > > > > > > > > > > > > > > > > > > > > *CONSOLE* > > > > > > [2016-03-01 14:42:14,280] INFO [GroupCoordinator 0]: Preparing to > > > restabilize group KafkaChannelBasicTestConsumer with old generation 1 > > > (kafka.coordin > > > ator.GroupCoordinator) > > > [2016-03-01 14:42:14,281] INFO [GroupCoordinator 0]: Group > > > KafkaChannelBasicTestConsumer generation 1 is dead and removed > > > (kafka.coordinator.GroupCoor > > > dinator) > > > [2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Preparing to > > > restabilize group KafkaChannelBasicTestConsumer with old generation 0 > > > (kafka.coordin > > > ator.GroupCoordinator) > > > [2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Stabilized group > > > KafkaChannelBasicTestConsumer generation 1 > > > (kafka.coordinator.GroupCoordinator) > > > [2016-03-01 14:42:22,797] INFO [GroupCoordinator 0]: Assignment > received > > > from leader for group KafkaChannelBasicTestConsumer for generation 1 > > > (kafka.c > > > oordinator.GroupCoordinator) > > > [2016-03-01 14:42:55,808] INFO [GroupCoordinator 0]: Preparing to > > > restabilize group KafkaChannelBasicTestConsumer with old generation 1 > > > (kafka.coordin > > > ator.GroupCoordinator) > > > [2016-03-01 14:42:55,809] INFO [GroupCoordinator 0]: Group > > > KafkaChannelBasicTestConsumer generation 1 is dead and removed > > > (kafka.coordinator.GroupCoor > > > dinator) > > > > > > > > > I really need help on this ! > > > > > > Regards, > > > > > > Robin > > > > > > > > > > > -- > > > > https://github.com/mindscratch > > https://www.google.com/+CraigWickesser > > https://twitter.com/mind_scratch > > https://twitter.com/craig_links > > >