Hi David, thanks for the answer. One follow-up question: will the watermark
be reset to Long.MIN_VALUE every time I restart a job with savepoint?

Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson <
[email protected]>:

> Watermarks always follow the corresponding event(s). I'm not sure why
> they were designed that way, but that is how they are implemented.
> Windows maintain this contract by emitting all of their results before
> forwarding the watermark that triggered the results.
>
> David
>
> On Mon, Mar 13, 2023 at 5:28 PM Shammon FY <[email protected]> wrote:
> >
> > Hi Alexis
> >
> > Do you use both event-time watermark generator and TimerService for
> processing time in your job? Maybe you can try using event-time watermark
> first.
> >
> > Best,
> > Shammon.FY
> >
> > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa <
> [email protected]> wrote:
> >>
> >> Hello,
> >>
> >> I recently ran into a weird issue with a streaming job in Flink 1.16.1.
> One of my functions (KeyedProcessFunction) has been using processing time
> timers. I now want to execute the same job based on a historical data dump,
> so I had to adjust the logic to use event time timers in that case (and did
> not use BATCH execution mode). Since my data has a timestamp field, I
> implemented a custom WatermarkGenerator that always emits a watermark with
> that timestamp in the onEvent callback, and does nothing in the
> onPeriodicEmit callback.
> >>
> >> My problem is that, sometimes, the very first time my function calls
> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
> some false triggers when the first watermark actually arrives.
> >>
> >> I would have expected that, if WatermarkGenerator.onEvent emits a
> watermark, it would be sent before the corresponding event, but maybe this
> is not always the case?
> >>
> >> In case it's relevant, a brief overview of my job's topology:
> >>
> >> Source1 -> Broadcast
> >>
> >> Source2 ->
> >>   keyBy ->
> >>   connect(Broadcast) ->
> >>   process ->
> >>   filter ->
> >>   assignTimestampsAndWatermarks -> // newly added for historical data
> >>   keyBy ->
> >>   process // function that uses timers
> >>
> >> Regards,
> >> Alexis.
>

Reply via email to