Hi Manu, Aljoscha, I had been interested in implementing FLIP-2, but I haven't been able to make time for it. There is no implementation yet that I'm aware of, and I'll gladly step aside (or help out how I can) if you or anyone is interested to take charge of it.
That said, I'm also not sure if discussions are ongoing. I had hoped to prototype the proposal as is, to have something more concrete to discuss. Cheers, aj On Nov 1, 2016 3:24 PM, "Manu Zhang" <owenzhang1...@gmail.com> wrote: > Thanks. The ideal case is to fire after watermark past each element from > the window but that requires a custom trigger and FLIP-2 as well. The > enhanced window evictor will help to avoid the last firing. > > Are the discussions on FLIP-2 still going on ? > Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction` > will be sufficient for my case) > Is there a workaround now for my case ? > > Thanks again for following through this. > Manu > > On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Ah, I finally understand it. You would a way to query the current >> watermark in the window function to only emit those elements where the >> timestamp is lower than the watermark. >> >> When the window fires again, do you want to emit elements that you >> emitted during the last firing again? If not, I think you also need to use >> an evictor to evict the elements from the window where the timestamp is >> lower than the watermark. With this FLIP https://cwiki.apache.org/ >> confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we >> should be able to extend the WindowFunction Context to also provide the >> current watermark. With this recent PR https://github.com/apache/ >> flink/pull/2736 you would be able to evict elements from the window >> state after the window function was called. >> >> Cheers, >> Aljoscha >> >> On Tue, 1 Nov 2016 at 02:27 Manu Zhang <owenzhang1...@gmail.com> wrote: >> >> Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink- >> examples/flink-examples-streaming/src/main/scala/org/ >> apache/flink/streaming/scala/examples/session/ >> PageViewSessionWindowing.scala >> >> If you print and compare the timestamp of timer with that of "PageView" >> in the outputs, you could see what I mean. >> >> I think the recently introduced TimelyFlatMapFunction is close to what I >> want to achieve. It will be great if we can query time information in the >> window function so I filed https://issues.apache. >> org/jira/browse/FLINK-4953 >> >> Thanks for your time. >> >> Manu >> >> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >> Hmm, I don't completely understand what's going on. Could you maybe post >> an example, with the trigger code that shows this behaviour? >> >> Cheers, >> Aljoscha >> >> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <owenzhang1...@gmail.com> wrote: >> >> Hi, >> >> It's what I'm seeing. If timers are not fired at the end of window, a >> state (in the window) whose timestamp is *after *the timer will also be >> emitted. That's a problem for event-time trigger. >> >> Thanks, >> Manu >> >> >> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >> Hi, >> is that example input/output what you would like to achieve or what you >> are currently seeing with Flink? I think for your use case a custom Trigger >> would be required that works like the event-time trigger but additionally >> registers timers for each element where you want to emit. >> >> Cheers, >> Aljoscha >> >> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <owenzhang1...@gmail.com> wrote: >> >> Hi Aljoscha, >> >> Thanks for your response. My use case is to track user trajectory based >> on page view event when they visit a website. The input would be like a >> list of PageView(userId, url, eventTimestamp) with watermarks (= >> eventTimestamp - duration). I'm trying SessionWindows with some event time >> trigger. Note we can't wait for the end of session window due to latency. >> Instead, we want to emit the user trajectories whenever a buffered >> PageView's event time is passed by watermark. I tried >> ContinuousEventTimeTrigger and a custom trigger which sets timer on each >> element's timestamp. For both triggers I've witnessed a problem like the >> following (e.g. a session gap of 5) >> >> PageView("user1", "http://foo", 1) >> PageView("user1", "http://foo/bar", 2) >> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar >> <http://foo/bar>*", [1,6]) >> PageView("user1", "http://foo/bar/foobar", 5) >> Watermark(4) => emit UserTrajectory("user1", "http://foo -> >> http://foo/bar -> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, >> 10]) >> >> The urls in bold should be included since there could be events before >> them not arrived yet. >> >> >> Thanks, >> Manu >> >> >> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >> Hi, >> with some additional information we might be able to figure this out >> together. What specific combination of WindowAssigner/Trigger are you using >> for your example and what is the input stream (including watermarks)? >> >> Cheers, >> Aljoscha >> >> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <owenzhang1...@gmail.com> wrote: >> >> Hi, >> >> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) >> which is triggered to emit when watermark passes the timestamp of an >> element. For example, >> >> on watermark(1:01), List(("a", 1:00)) is emitted >> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted >> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted >> >> It seems that if *("c", 1:06) is processed before watermark(1:04)* >> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on >> watermark(1:04). This is incorrect since there could be elements with >> timestamp between 1:04 and 1:06 that have not arrived yet. >> >> I guess this is because watermark trigger doesn't check whether element's >> timestamp has been passed. >> >> Please correct me if any of the above is not right. >> >> Thanks, >> Manu Zhang >> >> >> >>