Yeah, I think I understood what you were saying. What I'm saying is that if there were a way to just fetch metadata without doing the rest of the work poll() does, it wouldn't be necessary. I guess I can do listTopics to get all metadata for all topics and then parse it.
Regarding running a single instance, that is the case for what I'm talking about. On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Cody, > > What I meant for a special case of `seekToXX` is that, today when the > function is called with no partition parameters. It will try to execute the > logic on all "assigned" partitions for the consumer. And once that is done, > the subsequent poll() will not throw the exception since it knows those > partitions needs to reset offsets. > > However for your case, there is no assigned partitions yet, and hence > `seekToXX` will not take effects on any partitions. The assignment is > wrapped in the poll() call as you mentioned. And one way to solve it is to > let the `seekToXX()` with no parameters do the coordination and get the > assigned partitions if there are any subscribed topics, so that the > subsequent poll() will know those partitions need resetting offsets. Does > that make sense? > > As for now another way I can think of is to get the partition info > beforehand and call `seekToBeginning` on all partitions. But that only > works if the consumer knows it is going to get all the partitions assigned > to itself (i.e. you are only running a single instance). > > Guozhang > > > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger <c...@koeninger.org> wrote: > >> Another unfortunate thing about ConsumerRebalanceListener is that in >> order to do meaningful work in the callback, you need a reference to >> the consumer that called it. But that reference isn't provided to the >> callback, which means the listener implementation needs to hold a >> reference to the consumer. Seems like this makes it unnecessarily >> awkward to serialize or provide a 0 arg constructor for the listener. >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <c...@koeninger.org> wrote: >> > I thought about ConsumerRebalanceListener, but seeking to the >> > beginning any time there's a rebalance for whatever reason is not >> > necessarily the same thing as seeking to the beginning before first >> > starting the consumer. >> > >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <kamaltar...@gmail.com> wrote: >> >> Cody, >> >> >> >> Use ConsumerRebalanceListener to achieve that, >> >> >> >> ConsumerRebalanceListener listener = new ConsumerRebalanceListener() { >> >> >> >> @Override >> >> public void onPartitionsRevoked(Collection<TopicPartition> >> >> partitions) { >> >> } >> >> >> >> @Override >> >> public void onPartitionsAssigned(Collection<TopicPartition> >> >> partitions) { >> >> consumer.seekToBeginning(partitions.toArray(new >> >> TopicPartition[0])); >> >> } >> >> }; >> >> >> >> consumer.subscribe(topics, listener); >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >> >> >>> That suggestion doesn't work, for pretty much the same reason - at the >> >>> time poll is first called, there is no reset policy and no committed >> >>> offset, so NoOffsetForPartitionException is thrown >> >>> >> >>> I feel like the underlying problem isn't so much that seekToEnd needs >> >>> special case behavior. It's more that topic metadata fetches, >> >>> consumer position fetches, and message fetches are all lumped together >> >>> under a single poll() call, with no way to do them individually if >> >>> necessary. >> >>> >> >>> What does "work" in this situation is to just catch the exception >> >>> (which leaves the consumer in a state where topics are assigned) and >> >>> then seek. But that is not exactly an elegant interface. >> >>> >> >>> consumer.subscribe(topics) >> >>> try { >> >>> consumer.poll(0) >> >>> } catch { >> >>> case x: Throwable => >> >>> } >> >>> consumer.seekToBeginning() >> >>> consumer.poll(0) >> >>> >> >>> >> >>> >> >>> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >>> > Hi Cody, >> >>> > >> >>> > The problem with that code is in `seekToBeginning()` followed by >> >>> > `subscribe(topic)`. >> >>> > >> >>> > Since `subscribe` call is lazy evaluated, by the time >> `seekToBeginning()` >> >>> > is called no partition is assigned yet, and hence it is effectively >> an >> >>> > no-op. >> >>> > >> >>> > Try >> >>> > >> >>> > consumer.subscribe(topics) >> >>> > consumer.poll(0); // get assigned partitions >> >>> > consumer.seekToBeginning() >> >>> > consumer.poll(0) >> >>> > >> >>> > to see if that works. >> >>> > >> >>> > I think it is a valid issue that can be fixed in the new consumer >> that, >> >>> > upon calling seekToEnd/Beginning with no parameter, while no >> assigned is >> >>> > done yet, do the coordination behind the scene; it will though >> change the >> >>> > behavior of the functions as they are no longer always lazily >> evaluated. >> >>> > >> >>> > >> >>> > Guozhang >> >>> > >> >>> > >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org> >> >>> wrote: >> >>> > >> >>> >> Using the 0.9 consumer, I would like to start consuming at the >> >>> >> beginning or end, without specifying auto.offset.reset. >> >>> >> >> >>> >> This does not seem to be possible: >> >>> >> >> >>> >> val kafkaParams = Map[String, Object]( >> >>> >> "bootstrap.servers" -> conf.getString("kafka.brokers"), >> >>> >> "key.deserializer" -> classOf[StringDeserializer], >> >>> >> "value.deserializer" -> classOf[StringDeserializer], >> >>> >> "group.id" -> "example", >> >>> >> "auto.offset.reset" -> "none" >> >>> >> ).asJava >> >>> >> val topics = >> conf.getString("kafka.topics").split(",").toList.asJava >> >>> >> val consumer = new KafkaConsumer[String, String](kafkaParams) >> >>> >> consumer.subscribe(topics) >> >>> >> consumer.seekToBeginning() >> >>> >> consumer.poll(0) >> >>> >> >> >>> >> >> >>> >> Results in: >> >>> >> >> >>> >> Exception in thread "main" >> >>> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: >> >>> >> Undefined offset with no reset policy for partition: testtwo-4 >> >>> >> at >> >>> >> >> >>> >> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) >> >>> >> at >> >>> >> >> >>> >> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) >> >>> >> at >> >>> >> >> >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) >> >>> >> at >> >>> >> >> >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) >> >>> >> at >> >>> >> >> >>> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) >> >>> >> at >> example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) >> >>> >> >> >>> >> >> >>> >> I'm assuming this is because, at the time seekToBeginning() is >> called, >> >>> >> subscriptions.assignedPartitions isn't populated. But polling in >> >>> >> order to assign topicpartitions results in an error, which creates a >> >>> >> chicken-or-the-egg situation. >> >>> >> >> >>> >> I don't want to set auto.offset.reset, because I want a hard error >> if >> >>> >> the offsets are out of range at any other time during consumption. >> >>> >> >> >>> > >> >>> > >> >>> > >> >>> > -- >> >>> > -- Guozhang >> >>> >> > > > > -- > -- Guozhang