Hi guys, I have a question about Kafka endpoint. I’m not able to find a way to force a starting offset for a consumer group when starting a route. I would like to set the offset to the current timestamp for every new consumer group (or better to be able to specify a logic to evaluate the starting point)
The option “autoOffsetReset=latest” doesn’t work because the group is a new one and, in any case, I would like to skip to the latest available offset regardless of possible commits. In other words, I’d like to skip all the old messages for a topic once a new group is added. Something like: KafkaConsumer<String, Object> kafkaConsumer = …. …. Map<TopicPartition, Long> mapTopicPartition = new HashMap<>(); Set<TopicPartition> partitions = kafkaConsumer.assignment(); partitions.forEach(partition -> mapTopicPartition.put(partition, timestamp)); kafkaConsumer.offsetsForTimes(mapTopicPartition); initialized = true; Every suggestion will be appreciated! Thanks! Riccardo