[ https://issues.apache.org/jira/browse/KAFKA-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587689#comment-16587689 ]
Matthias J. Sax commented on KAFKA-7318: ---------------------------------------- > {{latest}} (the default) , if a consumer subscribe a new topic and then >close, during these times, there are some message was produced, the consumer >can not poll these messages. If you commit offsets, as you describe later, you get the behavior you want. Committing offsets is the right thing to do, to pick up where you left off. Note, that `auto.offset.reset` only triggers if there is no committed offsets. Thus, a now policy would not resolve the issue. You are right, that `poll()` semantics were changed – for a good reason. Also note, that you example code above might miss records, as `poll(0)` could return data that you would not process. What you could do is, to use `endOffsets(...)` API (after you subscribed) to get the current end-offsets, commit those, call seekToEnd() and start processing. > Should introduce a offset reset policy to consume only the messages after > subscribing > ------------------------------------------------------------------------------------- > > Key: KAFKA-7318 > URL: https://issues.apache.org/jira/browse/KAFKA-7318 > Project: Kafka > Issue Type: Improvement > Components: consumer > Affects Versions: 1.1.0, 1.1.1, 2.0.0 > Reporter: joechen8...@gmail.com > Priority: Major > > On our situation, we want the consumers only consume the messages which was > produced after subscribing. > Currently, kafka support 3 stategies with auto.offset.reset, but seems both > of them can not support the feature we want. > * {{latest}} (the default) , if a consumer subscribe a new topic and then > close, during these times, there are some message was produced, the consumer > can not poll these messages. > * earliest , consumer may consume all the messages on the topic before > subscribing. > * none, not in this scope. > Before version 1.1.0, we make the consumer poll and commit after subscribe > as below, this can mark the offset to 0 and works (enable.auto.commit is > false) . > > {code:java} > consumer.subscribe(topics, consumerRebalanceListener); > if(consumer.assignment().isEmpty()) { > consumer.poll(0); > consumer.commitSync(); > } > {code} > After upgrade the clients to >=1.1.0, it is broke. Seems it was broke by the > fix > [KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0], > but I am not sure about that. Then I try to invoke the > consumer.position(partitions) in onPartitionsAssigned of > ConsumerRebalanceListener, it works again. but it looks strangely that get > the position but do nothing. > > so we want to know whether there is a formal way to do this, maybe introduce > another stategy for auto.offset.reset to only consume the message after the > consumer subscribing is perfect. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)