Thanks Timo for your response and the references. I will try BoundedOutOfOrdernessTimestampExtractor, and if it does't work as expected, I will handle it as a separated pipeline. Also, is there a way to retrieve the last watermark before/after failure? So maybe I can persist the watermark to external storage and resume as a separated pipeline?
Best, Chengzhi On Tue, Apr 3, 2018 at 7:58 AM, Timo Walther <twal...@apache.org> wrote: > Hi Chengzhi, > > if you emit a watermark even though there is still data with a lower > timestamp, you generate "late data" that either needs to be processed in a > separate branch of your pipeline (see sideOutputLateData() [1]) or should > force your existing operators to update their previously emitted results. > The latter means holding state or the contents of your windows longer (see > allowedLateness() [1]). I think in general a processing time watermark > strategy might not be suitable for reprocessing. Either you parameterize > your watermark generator such that you can pass information through job > parameters or you use another strategy such as > BoundedOutOfOrdernessTimestampExtractor [2] and sinks that allow > idempotent updates. > > I hope this helps. > > Regards, > Timo > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.4/dev/stream/operators/windows.html#windows > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_ > timestamp_extractors.html > > Am 02.04.18 um 23:51 schrieb Chengzhi Zhao: > > Hello, flink community, > > I am using period watermark and extract the event time from each records > from files in S3. I am using the `TimeLagWatermarkGenerator` as it was > mentioned in flink documentation. > > Currently, a new watermark will be generated using processing time by > fixed amount > > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis() - maxTimeLag) > } > > This would work fine as long as process is running. However, in case of > failures, I mean if there was some bad data or out of memory occurs, I need > to stop the process and it will take me time to get back. If the maxTimeLag=3 > hours, and it took me 12 hours to realize and fix it. > > My question is since I am using processing time as part of the watermark, > when flink resumed from failures, will some records might be ignored by the > watermark? And what's the best practice to catchup and continue without > losing data? Thanks! > > Best, > Chengzhi > > >