Can you provide a bit more information ?

Release of Kafka
Java / Scala version

Thanks

On Wed, Oct 25, 2017 at 6:40 PM, Susheel Kumar <susheel2...@gmail.com>
wrote:

> Hello Kafka Users,
>
> I am trying to run below sample code mentioned in Kafka documentation under
> Automatic Offset Committing for a topic with 1 partition  (tried with 3 and
> more partition as well). Create command as follows
>
> bin/kafka-topics.sh --create --zookeeper <ZK>:2181 --replication-factor 3
> --partitions 1 --topic test --config cleanup.policy=compact,delete
>
> but the sample code always returns 0 records unless I provide a custom
> ConsumerRebalanceListener (below) which sets consumer to beginning.
>
> I wonder if the sample code given at Kafka documentation is wrong or am I
> missing something?
>
> https://kafka.apache.org/0101/javadoc/index.html?org/apache/
> kafka/clients/consumer/KafkaConsumer.html
>
>
> *Automatic Offset Committing*
>
> This example demonstrates a simple usage of Kafka's consumer api that
> relying on automatic offset committing.
>
>      Properties props = new Properties();
>      props.put("bootstrap.servers", "localhost:9092");
>      props.put("group.id", "test");
>      props.put("enable.auto.commit", "true");
>      props.put("auto.commit.interval.ms", "1000");
>      props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>      props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
>      consumer.subscribe(Arrays.asList("foo", "bar"));
>      while (true) {
>          ConsumerRecords<String, String> records = consumer.poll(100);
>          for (ConsumerRecord<String, String> record : records)
>              System.out.printf("offset = %d, key = %s, value = %s%n",
> record.offset(), record.key(), record.value());
>      }
>
>
>
> ====
>
> public class SeekToBeginingConsumerRebalancerListener implements
> org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
>
>      private Consumer<String, String> consumer;
>      public SeekToBeginingConsumerRebalancerListener(KafkaConsumer<String,
> String> consumer2) {
>              this.consumer = consumer2;
>      }
>      public void onPartitionsRevoked(Collection<TopicPartition>
> partitions) {
>              for (TopicPartition partition : partitions) {
>
> //offsetManager.saveOffsetInExternalStore(partition.topic(),
> partition.partition(),consumer.position(partition));
>              }
>      }
>      public void onPartitionsAssigned(Collection<TopicPartition>
> partitions) {
>             /* for (TopicPartition partition : partitions) {
>                      consumer.seek(partition,seekTo));
>              }*/
>          consumer.seekToBeginning(partitions);
>      }
> }
>

Reply via email to