Chesnay, Thanks for this - I've made the change you suggested (setAutoWatermarkInterval) but it hasn't changed the behaviour - timers still get processed only on stream end. I have pushed a new version, with this change, and also emitting some information in a .log field. If you search for "!!!" in Ingest.java and DPTimeoutFunction.java you'll see the relevant changes.
In DPTimeoutFunction you'll see that if I add code to say "cancel the timer only if it wouldn't have gone off" then the output is now correct - individual devices do timeout. However, this output only appears at the end of the stream (i.e. time jumps backwards as all the timers are processed) so I still appear not to be seeing timer processing at the correct event time. If there was no end of stream, I would never get any timeouts. Below is the output I get when I run. This output is correct but: a) only because I am manually cancelling timers in DPTimeoutFunction (search for "!!!") b) the timer events are timestamped correctly, but are not emitted into the stream at the right time - and if the stream didn't end then no timeouts would ever occur (which in particular means that devices that never come back online will never get marked as offline). Perhaps I do need to implement an onPeriodicEmit function? Does that require a customer watermark strategy? I can see how to define a custom watermark at link below, but unclear how to install that? https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy {"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "} {"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "} {"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "} {"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelling previous timer. "} {"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 1000 Cancelling previous timer. "} {"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelling previous timer. "} {"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000 msg_in.ts 2000 Cancelling previous timer. "} {"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelling previous timer. "} {"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000 msg_in.ts 3000 Cancelling previous timer. "} {"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000 msg_in.ts 4000 Cancelling previous timer. "} {"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000 msg_in.ts 4000 Cancelling previous timer. "} {"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000 msg_in.ts 5000 Cancelling previous timer. "} {"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0 msg_in.ts 5000 "} {"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000 msg_in.ts 6000 Cancelling previous timer. "} {"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000 msg_in.ts 6000 Cancelling previous timer. "} {"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000 msg_in.ts 7000 Cancelling previous timer. "} {"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000 msg_in.ts 7000 "} {"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000 msg_in.ts 7000 Cancelling previous timer. "} {"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelling previous timer. "} {"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelling previous timer. "} {"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000 msg_in.ts 8000 Cancelling previous timer. "} {"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelling previous timer. "} {"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelling previous timer. "} {"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000 msg_in.ts 9000 Cancelling previous timer. "} {"ts":10000,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelling previous timer. "} {"ts":10000,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelling previous timer. "} {"ts":10000,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000 msg_in.ts 10000 Cancelling previous timer. "} {"ts":1001,"id":"2","is_online":false} // These are the "going offline" events that we want to see. But they are emitted only once the stream has ended. {"ts":5001,"id":"1","is_online":false} {"ts":11001,"id":"1","is_online":false} {"ts":11001,"id":"0","is_online":false} {"ts":11001,"id":"2","is_online":false} Thanks, -Pilgrim -- Learn more at https://devicepilot.com @devicepilot <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34> +44 7961 125282 See our latest features <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34> and book me <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=5558f660-ceb0-461c-af58-a0d24fbcef34> for a video call. On Wed, 27 Jan 2021 at 14:09, Chesnay Schepler <ches...@apache.org> wrote: > You were right that it is an issue with the watermarks; outside of the > when the job was stopped they were never emitted downstream, so no timer > was ever triggered. > > It appears that you need to set the setAutoWatermarkInterval in the > ExecutionConfig via > > > env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(500).toMillis()); > > > to have them periodically emitted. Alternatively you could override > BoundedOutOfOrdernessWatermarks#onEvent to also emit a watermark for > event (for example, by calling #onPeriodicEmit). > > Put another way, if you use any of the built-in WatermarkGenerators and > use event-time, then it appears that you *must* set this interval. > > This behavior is...less than ideal I must admit, and it does not appear to > be properly documented. > > On 1/27/2021 1:56 PM, Chesnay Schepler wrote: > > Based on your description you aren't doing anything obviously wrong. > Would it be possible for you to share the code with us? > > On 1/27/2021 1:02 PM, Pilgrim Beart wrote: > > A newbie question: > > I've created a basic Flink DataStream job for an IoT use-case, with file > source and sink for testing. > I key by device ID, then in a ProcessFunction set an EventTime Timer to > fire if a device falls silent, i.e. a timeout, which I cancel if another > message arrives from that device within the timeout. > > My test source generates 3 devices, one of which falls silent for more > than the timeout period during the stream, then resumes again. So I expect > the Timer to fire for that device during the stream, and then for all the > Timers to fire after the end of the stream. > > The timers do indeed fire at the end of the stream (e.g. with a timeout of > 1000, the timers all fire 1000 after the end of the stream, which is > correct). But no timer fires for the device which falls silent during the > stream (even though other devices are still talking, advancing event time). > I've verified that I am keying correctly by ID. > > I suspect this is something to do with Watermarks. I'm using > forBoundedOutOfOrderness watermarking with a duration of 0. > > All suggestions welcome, thanks. > > -Pilgrim > -- > Learn more at https://devicepilot.com @devicepilot > <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759> > +44 7961 125282 > See our latest features > <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759> > and book me > <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=6c5d2342-2e03-4926-a6bd-49f564169759> > for > a video call. > > > >