I'm writing a Flink connector to write a stream of events from Kafka to Elastic Search. It is a typical metrics ingestion pipeline, where the latest metrics preferred over the stale data. What I mean by that, let's assume there was an outage of Elastic Search cluster for about 20 minutes, all the metrics backlogged in Kafka during that period. Once ES is available, the Flink stream will resume from the last offset checkpoint (correctly so) and try to catch up. Instead is their way we can customize flink stream to say if it detects significant backlog, skip over and consume from the latest offset, and schedule a separate backfill task for the backlogged 20 minutes?
Regards, Ananth.P,