Hi Ananth,

> if it detects significant backlog, skip over and consume from the latest
offset, and schedule a separate backfill task for the backlogged 20 minutes?

Fabian is right, there is no built-in operators for this.
If you don't care about Watermark, I think we can implement it with a
custom source which can sleep or consume data within a time range.

The job looks like:
Source1(s1) ->
                         Union -> Sink
Source2(s2) ->

The job works as follows:
- t1: s1 working, s2 sleep
- t2: There is an outage of Elastic Search cluster
- t3: ES is available. s1 resume from t1 and end with t3. s2 start from t3
directly.
- t4: s1 sleep, s2 working

To achieve this, we should also find a way to exchange progresses between
the two sources. For example, sync source status with a Hbase or a Mysql
Table.

Best, Hequn


On Mon, Oct 1, 2018 at 5:17 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Ananth,
>
> You can certainly do this with Flink, but there are no built-in operators
> for this.
> What you probably want to do is to compare the timestamp of the event with
> the current processing time and drop the record if it is too old.
> If the timestamp is encoded in the record, you can do this in
> FilterFunction or a FlatMapFunction. If the timestamp is attached as
> event-time timestamp, you can access it in a ProcessFunction.
>
> Best, Fabian
>
> Am Sa., 29. Sep. 2018 um 21:11 Uhr schrieb Ananth Durai <
> vanant...@gmail.com>:
>
>>
>> I'm writing a Flink connector to write a stream of events from Kafka to
>> Elastic Search. It is a typical metrics ingestion pipeline, where the
>> latest metrics preferred over the stale data.
>> What I mean by that, let's assume there was an outage of Elastic Search
>> cluster for about 20 minutes, all the metrics backlogged in Kafka during
>> that period. Once ES is available, the Flink stream will resume from the
>> last offset checkpoint (correctly so) and try to catch up. Instead is their
>> way we can customize flink stream to say if it detects significant backlog,
>> skip over and consume from the latest offset, and schedule a separate
>> backfill task for the backlogged 20 minutes?
>>
>> Regards,
>> Ananth.P,
>>
>>
>>
>>
>>
>>

Reply via email to