Can you provide a bit more information ? Release of Kafka Java / Scala version
Thanks On Wed, Oct 25, 2017 at 6:40 PM, Susheel Kumar <susheel2...@gmail.com> wrote: > Hello Kafka Users, > > I am trying to run below sample code mentioned in Kafka documentation under > Automatic Offset Committing for a topic with 1 partition (tried with 3 and > more partition as well). Create command as follows > > bin/kafka-topics.sh --create --zookeeper <ZK>:2181 --replication-factor 3 > --partitions 1 --topic test --config cleanup.policy=compact,delete > > but the sample code always returns 0 records unless I provide a custom > ConsumerRebalanceListener (below) which sets consumer to beginning. > > I wonder if the sample code given at Kafka documentation is wrong or am I > missing something? > > https://kafka.apache.org/0101/javadoc/index.html?org/apache/ > kafka/clients/consumer/KafkaConsumer.html > > > *Automatic Offset Committing* > > This example demonstrates a simple usage of Kafka's consumer api that > relying on automatic offset committing. > > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "test"); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); > consumer.subscribe(Arrays.asList("foo", "bar")); > while (true) { > ConsumerRecords<String, String> records = consumer.poll(100); > for (ConsumerRecord<String, String> record : records) > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > > > > ==== > > public class SeekToBeginingConsumerRebalancerListener implements > org.apache.kafka.clients.consumer.ConsumerRebalanceListener { > > private Consumer<String, String> consumer; > public SeekToBeginingConsumerRebalancerListener(KafkaConsumer<String, > String> consumer2) { > this.consumer = consumer2; > } > public void onPartitionsRevoked(Collection<TopicPartition> > partitions) { > for (TopicPartition partition : partitions) { > > //offsetManager.saveOffsetInExternalStore(partition.topic(), > partition.partition(),consumer.position(partition)); > } > } > public void onPartitionsAssigned(Collection<TopicPartition> > partitions) { > /* for (TopicPartition partition : partitions) { > consumer.seek(partition,seekTo)); > }*/ > consumer.seekToBeginning(partitions); > } > } >