[
https://issues.apache.org/jira/browse/KAFKA-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16594524#comment-16594524
]
[email protected] edited comment on KAFKA-7318 at 8/28/18 5:06 AM:
-----------------------------------------------------------------------
I try your solution as below but not work. After subscribing, the partition
assignment have not be triggered
{code:java}
consumer.subscribe(kafkaTopics, consumerRebalanceListener);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(new ArrayList<>());
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for(TopicPartition p : endOffsets.keySet()){
offsets.put(p, new OffsetAndMetadata(endOffsets.get(p)));
}
consumer.commitSync(offsets);
consumer.seekToEnd(new ArrayList<>());
{code}
was (Author: joechen):
I try your solution as below but not work.
{code:java}
consumer.subscribe(kafkaTopics, consumerRebalanceListener);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(new ArrayList<>());
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for(TopicPartition p : endOffsets.keySet()){
offsets.put(p, new OffsetAndMetadata(endOffsets.get(p)));
}
consumer.commitSync(offsets);
consumer.seekToEnd(new ArrayList<>());
{code}
> Should introduce a offset reset policy to consume only the messages after
> subscribing
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-7318
> URL: https://issues.apache.org/jira/browse/KAFKA-7318
> Project: Kafka
> Issue Type: Improvement
> Components: consumer
> Affects Versions: 1.1.0, 1.1.1, 2.0.0
> Reporter: [email protected]
> Priority: Major
>
> On our situation, we want the consumers only consume the messages which was
> produced after subscribing.
> Currently, kafka support 3 stategies with auto.offset.reset, but seems both
> of them can not support the feature we want.
> * {{latest}} (the default) , if a consumer subscribe a new topic and then
> close, during these times, there are some message was produced, the consumer
> can not poll these messages.
> * earliest , consumer may consume all the messages on the topic before
> subscribing.
> * none, not in this scope.
> Before version 1.1.0, we make the consumer poll and commit after subscribe
> as below, this can mark the offset to 0 and works (enable.auto.commit is
> false) .
>
> {code:java}
> consumer.subscribe(topics, consumerRebalanceListener);
> if(consumer.assignment().isEmpty()) {
> consumer.poll(0);
> consumer.commitSync();
> }
> {code}
> After upgrade the clients to >=1.1.0, it is broke. Seems it was broke by the
> fix
> [KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
> but I am not sure about that. Then I try to invoke the
> consumer.position(partitions) in onPartitionsAssigned of
> ConsumerRebalanceListener, it works again. but it looks strangely that get
> the position but do nothing.
>
> so we want to know whether there is a formal way to do this, maybe introduce
> another stategy for auto.offset.reset to only consume the message after the
> consumer subscribing is perfect.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)