Thanks Timo for your response and the references.

I will try BoundedOutOfOrdernessTimestampExtractor, and if it does't work
as expected, I will handle it as a separated pipeline.
Also, is there a way to retrieve the last watermark before/after failure?
So maybe I can persist the watermark to external storage and resume as
a separated
pipeline?

Best,
Chengzhi


On Tue, Apr 3, 2018 at 7:58 AM, Timo Walther <twal...@apache.org> wrote:

> Hi Chengzhi,
>
> if you emit a watermark even though there is still data with a lower
> timestamp, you generate "late data" that either needs to be processed in a
> separate branch of your pipeline (see sideOutputLateData() [1]) or should
> force your existing operators to update their previously emitted results.
> The latter means holding state or the contents of your windows longer (see
> allowedLateness() [1]). I think in general a processing time watermark
> strategy might not be suitable for reprocessing. Either you parameterize
> your watermark generator such that you can pass information through job
> parameters or you use another strategy such as
> BoundedOutOfOrdernessTimestampExtractor [2] and sinks that allow
> idempotent updates.
>
> I hope this helps.
>
> Regards,
> Timo
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/stream/operators/windows.html#windows
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_
> timestamp_extractors.html
>
> Am 02.04.18 um 23:51 schrieb Chengzhi Zhao:
>
> Hello, flink community,
>
> I am using period watermark and extract the event time from each records
> from files in S3. I am using the `TimeLagWatermarkGenerator` as it was
> mentioned in flink documentation.
>
> Currently, a new watermark will be generated using processing time by
> fixed amount
>
> override def getCurrentWatermark: Watermark = {
>     new Watermark(System.currentTimeMillis() - maxTimeLag)
> }
>
> This would work fine as long as process is running. However, in case of
> failures, I mean if there was some bad data or out of memory occurs, I need
> to stop the process and it will take me time to get back. If the maxTimeLag=3
> hours, and it took me 12 hours to realize and fix it.
>
> My question is since I am using processing time as part of the watermark,
> when flink resumed from failures, will some records might be ignored by the
> watermark? And what's the best practice to catchup and continue without
> losing data? Thanks!
>
> Best,
> Chengzhi
>
>
>

Reply via email to