Hi guys,
      I have a question about Kafka endpoint.

I’m not able to find a way to force a starting offset for a consumer group when 
starting a route.
I would like to set the offset to the current timestamp for every new consumer 
group (or better to be able to specify a logic to evaluate the starting point)

The option “autoOffsetReset=latest” doesn’t work because the group is a new one 
and, in any case, I would like to skip to the latest available offset 
regardless of possible commits.

In other words, I’d like to skip all the old messages for a topic once a new 
group is added.

Something like:

KafkaConsumer<String, Object> kafkaConsumer = ….
….
Map<TopicPartition, Long> mapTopicPartition = new HashMap<>();
Set<TopicPartition> partitions = kafkaConsumer.assignment();
partitions.forEach(partition -> mapTopicPartition.put(partition, timestamp));
kafkaConsumer.offsetsForTimes(mapTopicPartition);
initialized = true;


Every suggestion will be appreciated!
Thanks!

Riccardo

Reply via email to