Producer : https://gist.github.com/r0perice/9ce2bece76dd4113a44a Consumer : https://gist.github.com/r0perice/8dcee160017ccd779d59 Console : https://gist.github.com/r0perice/5a8e2b2939651b1ac893
2016-03-01 14:50 GMT+01:00 craig w <codecr...@gmail.com>: > Can you try posting your code into a Gist (gist.github.com) or Pastebin, > so > it's formatted and easier to read? > > On Tue, Mar 1, 2016 at 8:49 AM, Péricé Robin <perice.ro...@gmail.com> > wrote: > > > Hello everybody, > > > > I'm having troubles using KafkaConsumer 0.9.0.0 API. My Consumer class > > doesn't consumer messages properly. > > > > ------------------------- > > | *Consumer*.java | > > ------------------------- > > > > public final void run() { try { > > *consumer.subscribe(Collections.singletonList(topicName));* boolean end = > > false; while (!closed.get()) { while (!end) { *final > ConsumerRecords<Long, > > ITupleApi> records = consumer.poll(1000);* if (records == null || > records. > > count() == 0) { System.err.println("In ConsumerThread consumer.poll > > received nothing on " + topicName); } else { /* some processing on > records > > here */ end = true; } } } } catch (final WakeupException e) { if > (!closed. > > get()) { throw e; } } finally { consumer.close(); } } > > > > ------------------------ > > | *Producer*.java | > > ------------------------ > > > > > > public final void run() { > > while (true) { > > *producer.send(new ProducerRecord<Long, ITupleApi>(topicName, timems, > > tuple), callback);* > > } > > } > > > > ---------------------------------------------- > > > > I run this exactly the same way as in Github example : > > > > > https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java > > > > But I only get "In ConsumerThread consumer.poll received nothing". Poll() > > never send me messages ... But when I use command line tools I can see my > > messages on the topic. > > > > When I run the basic example from GitHub everything works fine ... So > it's > > seems like I'm missing something. > > > > > > > > > > > > > > *CONSOLE* > > > > [2016-03-01 14:42:14,280] INFO [GroupCoordinator 0]: Preparing to > > restabilize group KafkaChannelBasicTestConsumer with old generation 1 > > (kafka.coordin > > ator.GroupCoordinator) > > [2016-03-01 14:42:14,281] INFO [GroupCoordinator 0]: Group > > KafkaChannelBasicTestConsumer generation 1 is dead and removed > > (kafka.coordinator.GroupCoor > > dinator) > > [2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Preparing to > > restabilize group KafkaChannelBasicTestConsumer with old generation 0 > > (kafka.coordin > > ator.GroupCoordinator) > > [2016-03-01 14:42:22,788] INFO [GroupCoordinator 0]: Stabilized group > > KafkaChannelBasicTestConsumer generation 1 > > (kafka.coordinator.GroupCoordinator) > > [2016-03-01 14:42:22,797] INFO [GroupCoordinator 0]: Assignment received > > from leader for group KafkaChannelBasicTestConsumer for generation 1 > > (kafka.c > > oordinator.GroupCoordinator) > > [2016-03-01 14:42:55,808] INFO [GroupCoordinator 0]: Preparing to > > restabilize group KafkaChannelBasicTestConsumer with old generation 1 > > (kafka.coordin > > ator.GroupCoordinator) > > [2016-03-01 14:42:55,809] INFO [GroupCoordinator 0]: Group > > KafkaChannelBasicTestConsumer generation 1 is dead and removed > > (kafka.coordinator.GroupCoor > > dinator) > > > > > > I really need help on this ! > > > > Regards, > > > > Robin > > > > > > -- > > https://github.com/mindscratch > https://www.google.com/+CraigWickesser > https://twitter.com/mind_scratch > https://twitter.com/craig_links >