Hi Ananth, > if it detects significant backlog, skip over and consume from the latest offset, and schedule a separate backfill task for the backlogged 20 minutes?
Fabian is right, there is no built-in operators for this. If you don't care about Watermark, I think we can implement it with a custom source which can sleep or consume data within a time range. The job looks like: Source1(s1) -> Union -> Sink Source2(s2) -> The job works as follows: - t1: s1 working, s2 sleep - t2: There is an outage of Elastic Search cluster - t3: ES is available. s1 resume from t1 and end with t3. s2 start from t3 directly. - t4: s1 sleep, s2 working To achieve this, we should also find a way to exchange progresses between the two sources. For example, sync source status with a Hbase or a Mysql Table. Best, Hequn On Mon, Oct 1, 2018 at 5:17 PM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Ananth, > > You can certainly do this with Flink, but there are no built-in operators > for this. > What you probably want to do is to compare the timestamp of the event with > the current processing time and drop the record if it is too old. > If the timestamp is encoded in the record, you can do this in > FilterFunction or a FlatMapFunction. If the timestamp is attached as > event-time timestamp, you can access it in a ProcessFunction. > > Best, Fabian > > Am Sa., 29. Sep. 2018 um 21:11 Uhr schrieb Ananth Durai < > vanant...@gmail.com>: > >> >> I'm writing a Flink connector to write a stream of events from Kafka to >> Elastic Search. It is a typical metrics ingestion pipeline, where the >> latest metrics preferred over the stale data. >> What I mean by that, let's assume there was an outage of Elastic Search >> cluster for about 20 minutes, all the metrics backlogged in Kafka during >> that period. Once ES is available, the Flink stream will resume from the >> last offset checkpoint (correctly so) and try to catch up. Instead is their >> way we can customize flink stream to say if it detects significant backlog, >> skip over and consume from the latest offset, and schedule a separate >> backfill task for the backlogged 20 minutes? >> >> Regards, >> Ananth.P, >> >> >> >> >> >>