Yeah, I think I understood what you were saying.  What I'm saying is
that if there were a way to just fetch metadata without doing the rest
of the work poll() does, it wouldn't be necessary.  I guess I can do
listTopics to get all metadata for all topics and then parse it.

Regarding running a single instance, that is the case for what I'm
talking about.

On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> Cody,
>
> What I meant for a special case of `seekToXX` is that, today when the
> function is called with no partition parameters. It will try to execute the
> logic on all "assigned" partitions for the consumer. And once that is done,
> the subsequent poll() will not throw the exception since it knows those
> partitions needs to reset offsets.
>
> However for your case, there is no assigned partitions yet, and hence
> `seekToXX` will not take effects on any partitions. The assignment is
> wrapped in the poll() call as you mentioned. And one way to solve it is to
> let the `seekToXX()` with no parameters do the coordination and get the
> assigned partitions if there are any subscribed topics, so that the
> subsequent poll() will know those partitions need resetting offsets. Does
> that make sense?
>
> As for now another way I can think of is to get the partition info
> beforehand and call `seekToBeginning` on all partitions. But that only
> works if the consumer knows it is going to get all the partitions assigned
> to itself (i.e. you are only running a single instance).
>
> Guozhang
>
>
> On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> Another unfortunate thing about ConsumerRebalanceListener is that in
>> order to do meaningful work in the callback, you need a reference to
>> the consumer that called it.  But that reference isn't provided to the
>> callback, which means the listener implementation needs to hold a
>> reference to the consumer.  Seems like this makes it unnecessarily
>> awkward to serialize or provide a 0 arg constructor for the listener.
>>
>> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <c...@koeninger.org> wrote:
>> > I thought about ConsumerRebalanceListener, but seeking to the
>> > beginning any time there's a rebalance for whatever reason is not
>> > necessarily the same thing as seeking to the beginning before first
>> > starting the consumer.
>> >
>> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <kamaltar...@gmail.com> wrote:
>> >> Cody,
>> >>
>> >> Use ConsumerRebalanceListener to achieve that,
>> >>
>> >> ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
>> >>
>> >>             @Override
>> >>             public void onPartitionsRevoked(Collection<TopicPartition>
>> >> partitions) {
>> >>             }
>> >>
>> >>             @Override
>> >>             public void onPartitionsAssigned(Collection<TopicPartition>
>> >> partitions) {
>> >>                 consumer.seekToBeginning(partitions.toArray(new
>> >> TopicPartition[0]));
>> >>             }
>> >>         };
>> >>
>> >> consumer.subscribe(topics, listener);
>> >>
>> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>> >>
>> >>> That suggestion doesn't work, for pretty much the same reason - at the
>> >>> time poll is first called, there is no reset policy and no committed
>> >>> offset, so NoOffsetForPartitionException is thrown
>> >>>
>> >>> I feel like the underlying problem isn't so much that seekToEnd needs
>> >>> special case behavior.  It's more that  topic metadata fetches,
>> >>> consumer position fetches, and message fetches are all lumped together
>> >>> under a single poll() call, with no way to do them individually if
>> >>> necessary.
>> >>>
>> >>> What does "work" in this situation is to just catch the exception
>> >>> (which leaves the consumer in a state where topics are assigned) and
>> >>> then seek.  But that is not exactly an elegant interface.
>> >>>
>> >>>     consumer.subscribe(topics)
>> >>>     try {
>> >>>       consumer.poll(0)
>> >>>     } catch {
>> >>>       case x: Throwable =>
>> >>>     }
>> >>>     consumer.seekToBeginning()
>> >>>     consumer.poll(0)
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >>> > Hi Cody,
>> >>> >
>> >>> > The problem with that code is in `seekToBeginning()` followed by
>> >>> > `subscribe(topic)`.
>> >>> >
>> >>> > Since `subscribe` call is lazy evaluated, by the time
>> `seekToBeginning()`
>> >>> > is called no partition is assigned yet, and hence it is effectively
>> an
>> >>> > no-op.
>> >>> >
>> >>> > Try
>> >>> >
>> >>> >     consumer.subscribe(topics)
>> >>> >     consumer.poll(0);  // get assigned partitions
>> >>> >     consumer.seekToBeginning()
>> >>> >     consumer.poll(0)
>> >>> >
>> >>> > to see if that works.
>> >>> >
>> >>> > I think it is a valid issue that can be fixed in the new consumer
>> that,
>> >>> > upon calling seekToEnd/Beginning with no parameter, while no
>> assigned is
>> >>> > done yet, do the coordination behind the scene; it will though
>> change the
>> >>> > behavior of the functions as they are no longer always lazily
>> evaluated.
>> >>> >
>> >>> >
>> >>> > Guozhang
>> >>> >
>> >>> >
>> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org>
>> >>> wrote:
>> >>> >
>> >>> >> Using the 0.9 consumer, I would like to start consuming at the
>> >>> >> beginning or end, without specifying auto.offset.reset.
>> >>> >>
>> >>> >> This does not seem to be possible:
>> >>> >>
>> >>> >>     val kafkaParams = Map[String, Object](
>> >>> >>       "bootstrap.servers" -> conf.getString("kafka.brokers"),
>> >>> >>       "key.deserializer" -> classOf[StringDeserializer],
>> >>> >>       "value.deserializer" -> classOf[StringDeserializer],
>> >>> >>       "group.id" -> "example",
>> >>> >>       "auto.offset.reset" -> "none"
>> >>> >>     ).asJava
>> >>> >>     val topics =
>> conf.getString("kafka.topics").split(",").toList.asJava
>> >>> >>     val consumer = new KafkaConsumer[String, String](kafkaParams)
>> >>> >>     consumer.subscribe(topics)
>> >>> >>     consumer.seekToBeginning()
>> >>> >>     consumer.poll(0)
>> >>> >>
>> >>> >>
>> >>> >> Results in:
>> >>> >>
>> >>> >> Exception in thread "main"
>> >>> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
>> >>> >> Undefined offset with no reset policy for partition: testtwo-4
>> >>> >>         at
>> >>> >>
>> >>>
>> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
>> >>> >>         at
>> >>> >>
>> >>>
>> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
>> >>> >>         at
>> >>> >>
>> >>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
>> >>> >>         at
>> >>> >>
>> >>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
>> >>> >>         at
>> >>> >>
>> >>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>> >>> >>         at
>> example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
>> >>> >>
>> >>> >>
>> >>> >> I'm assuming this is because, at the time seekToBeginning() is
>> called,
>> >>> >> subscriptions.assignedPartitions isn't populated.  But polling in
>> >>> >> order to assign topicpartitions results in an error, which creates a
>> >>> >> chicken-or-the-egg situation.
>> >>> >>
>> >>> >> I don't want to set auto.offset.reset, because I want a hard error
>> if
>> >>> >> the offsets are out of range at any other time during consumption.
>> >>> >>
>> >>> >
>> >>> >
>> >>> >
>> >>> > --
>> >>> > -- Guozhang
>> >>>
>>
>
>
>
> --
> -- Guozhang

Reply via email to