Thanks Fabian! This is very helpful!

Best,
Chengzhi




On Wed, Apr 4, 2018 at 9:02 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Chengzhi,
>
> You can access the current watermark from the Context object of a
> ProcessFunction [1] and store it in operator state [2].
> In case of a restart, the state will be restored with the watermark that
> was active when the checkpoint (or savepoint) was taken. Note, this won't
> be the last watermark before the failure happened.
>
> Best,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/stream/operators/process_function.html
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/stream/state/state.html#operator-state
>
> 2018-04-03 19:39 GMT+02:00 Chengzhi Zhao <w.zhaocheng...@gmail.com>:
>
>> Thanks Timo for your response and the references.
>>
>> I will try BoundedOutOfOrdernessTimestampExtractor, and if it does't
>> work as expected, I will handle it as a separated pipeline.
>> Also, is there a way to retrieve the last watermark before/after failure?
>> So maybe I can persist the watermark to external storage and resume as a 
>> separated
>> pipeline?
>>
>> Best,
>> Chengzhi
>>
>>
>> On Tue, Apr 3, 2018 at 7:58 AM, Timo Walther <twal...@apache.org> wrote:
>>
>>> Hi Chengzhi,
>>>
>>> if you emit a watermark even though there is still data with a lower
>>> timestamp, you generate "late data" that either needs to be processed in a
>>> separate branch of your pipeline (see sideOutputLateData() [1]) or should
>>> force your existing operators to update their previously emitted results.
>>> The latter means holding state or the contents of your windows longer (see
>>> allowedLateness() [1]). I think in general a processing time watermark
>>> strategy might not be suitable for reprocessing. Either you parameterize
>>> your watermark generator such that you can pass information through job
>>> parameters or you use another strategy such as
>>> BoundedOutOfOrdernessTimestampExtractor [2] and sinks that allow
>>> idempotent updates.
>>>
>>> I hope this helps.
>>>
>>> Regards,
>>> Timo
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/stream/operators/windows.html#windows
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/event_timestamp_extractors.html
>>>
>>> Am 02.04.18 um 23:51 schrieb Chengzhi Zhao:
>>>
>>> Hello, flink community,
>>>
>>> I am using period watermark and extract the event time from each records
>>> from files in S3. I am using the `TimeLagWatermarkGenerator` as it was
>>> mentioned in flink documentation.
>>>
>>> Currently, a new watermark will be generated using processing time by
>>> fixed amount
>>>
>>> override def getCurrentWatermark: Watermark = {
>>>     new Watermark(System.currentTimeMillis() - maxTimeLag)
>>> }
>>>
>>> This would work fine as long as process is running. However, in case of
>>> failures, I mean if there was some bad data or out of memory occurs, I need
>>> to stop the process and it will take me time to get back. If the 
>>> maxTimeLag=3
>>> hours, and it took me 12 hours to realize and fix it.
>>>
>>> My question is since I am using processing time as part of the
>>> watermark, when flink resumed from failures, will some records might be
>>> ignored by the watermark? And what's the best practice to catchup and
>>> continue without losing data? Thanks!
>>>
>>> Best,
>>> Chengzhi
>>>
>>>
>>>
>>
>

Reply via email to