Chesnay,
Thanks for this - I've made the change you suggested
(setAutoWatermarkInterval) but it hasn't changed the behaviour - timers
still get processed only on stream end.
I have pushed a new version, with this change, and also emitting some
information in a .log field.
If you search for "!!!" in Ingest.java and DPTimeoutFunction.java you'll
see the relevant changes.

In DPTimeoutFunction you'll see that if I add code to say "cancel the timer
only if it wouldn't have gone off" then the output is now correct -
individual devices do timeout. However, this output only appears at the end
of the stream (i.e. time jumps backwards as all the timers are processed)
so I still appear not to be seeing timer processing at the correct event
time. If there was no end of stream, I would never get any timeouts.

Below is the output I get when I run. This output is correct but:
a) only because I am manually cancelling timers in DPTimeoutFunction
(search for "!!!")
b) the timer events are timestamped correctly, but are not emitted into the
stream at the right time - and if the stream didn't end then no timeouts
would ever occur (which in particular means that devices that never come
back online will never get marked as offline).

Perhaps I do need to implement an onPeriodicEmit function? Does that
require a customer watermark strategy? I can see how to define a custom
watermark at link below, but unclear how to install that?
https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy

{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000
msg_in.ts 2000 Cancelling previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000
msg_in.ts 2000 Cancelling previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000
msg_in.ts 3000 Cancelling previous timer. "}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000
msg_in.ts 3000 Cancelling previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000
msg_in.ts 4000 Cancelling previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000
msg_in.ts 4000 Cancelling previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000
msg_in.ts 5000 Cancelling previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 5000 "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000
msg_in.ts 6000 Cancelling previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000
msg_in.ts 6000 Cancelling previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000
msg_in.ts 7000 Cancelling previous timer. "}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000
msg_in.ts 7000 "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000
msg_in.ts 7000 Cancelling previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelling previous timer. "}
{"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelling previous timer. "}
{"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelling previous timer. "}
{"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelling previous timer. "}
{"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 10000 Cancelling previous timer. "}
{"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 10000 Cancelling previous timer. "}
{"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 10000 Cancelling previous timer. "}
{"ts":1001,"id":"2","is_online":false} // These are the "going offline"
events that we want to see. But they are emitted only once the stream has
ended.
{"ts":5001,"id":"1","is_online":false}
{"ts":11001,"id":"1","is_online":false}
{"ts":11001,"id":"0","is_online":false}
{"ts":11001,"id":"2","is_online":false}

Thanks,

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
 +44 7961 125282
See our latest features
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
and book me
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34>
for
a video call.



On Wed, 27 Jan 2021 at 14:09, Chesnay Schepler <ches...@apache.org> wrote:

> You were right that it is an issue with the watermarks; outside of the
> when the job was stopped they were never emitted downstream, so no timer
> was ever triggered.
>
> It appears that you need to set the setAutoWatermarkInterval in the
> ExecutionConfig via
>
>
> env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(500).toMillis());
>
>
> to have them periodically emitted. Alternatively you could override
> BoundedOutOfOrdernessWatermarks#onEvent to also emit a watermark for
> event (for example, by calling #onPeriodicEmit).
>
> Put another way, if you use any of the built-in WatermarkGenerators and
> use event-time, then it appears that you *must* set this interval.
>
> This behavior is...less than ideal I must admit, and it does not appear to
> be properly documented.
>
> On 1/27/2021 1:56 PM, Chesnay Schepler wrote:
>
> Based on your description you aren't doing anything obviously wrong.
> Would it be possible for you to share the code with us?
>
> On 1/27/2021 1:02 PM, Pilgrim Beart wrote:
>
> A newbie question:
>
> I've created a basic Flink DataStream job for an IoT use-case, with file
> source and sink for testing.
> I key by device ID, then in a ProcessFunction set an EventTime Timer to
> fire if a device falls silent, i.e. a timeout, which I cancel if another
> message arrives from that device within the timeout.
>
> My test source generates 3 devices, one of which falls silent for more
> than the timeout period during the stream, then resumes again. So I expect
> the Timer to fire for that device during the stream, and then for all the
> Timers to fire after the end of the stream.
>
> The timers do indeed fire at the end of the stream (e.g. with a timeout of
> 1000, the timers all fire 1000 after the end of the stream, which is
> correct). But no timer fires for the device which falls silent during the
> stream (even though other devices are still talking, advancing event time).
> I've verified that I am keying correctly by ID.
>
> I suspect this is something to do with Watermarks. I'm using
> forBoundedOutOfOrderness watermarking with a duration of 0.
>
> All suggestions welcome, thanks.
>
> -Pilgrim
> --
> Learn more at https://devicepilot.com @devicepilot
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
>  +44 7961 125282
> See our latest features
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
> and book me
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759>
>  for
> a video call.
>
>
>
>

Reply via email to