Hi Manu, Aljoscha,

I had been interested in implementing FLIP-2, but I haven't been able to
make time for it. There is no implementation yet that I'm aware of, and
I'll gladly step aside (or help out how I can) if you or anyone is
interested to take charge of it.

That said, I'm also not sure if discussions are ongoing. I had hoped to
prototype the proposal as is, to have something more concrete to discuss.

Cheers,
aj
On Nov 1, 2016 3:24 PM, "Manu Zhang" <owenzhang1...@gmail.com> wrote:

> Thanks.  The ideal case is to fire after watermark past each element from
> the window but that requires a custom trigger and FLIP-2 as well. The
> enhanced window evictor will help to avoid the last firing.
>
> Are the discussions on FLIP-2 still going on ?
> Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
> will be sufficient for my case)
> Is there a workaround now for my case ?
>
> Thanks again for following through this.
> Manu
>
> On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Ah, I finally understand it. You would a way to query the current
>> watermark in the window function to only emit those elements where the
>> timestamp is lower than the watermark.
>>
>> When the window fires again, do you want to emit elements that you
>> emitted during the last firing again? If not, I think you also need to use
>> an evictor to evict the elements from the window where the timestamp is
>> lower than the watermark. With this FLIP https://cwiki.apache.org/
>> confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we
>> should be able to extend the WindowFunction Context to also provide the
>> current watermark. With this recent PR https://github.com/apache/
>> flink/pull/2736 you would be able to evict elements from the window
>> state after the window function was called.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 1 Nov 2016 at 02:27 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>
>> Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-
>> examples/flink-examples-streaming/src/main/scala/org/
>> apache/flink/streaming/scala/examples/session/
>> PageViewSessionWindowing.scala
>>
>> If you print and compare the timestamp of timer with that of "PageView"
>> in the outputs, you could see what I mean.
>>
>> I think the recently introduced TimelyFlatMapFunction is close to what I
>> want to achieve. It will be great if we can query time information in the
>> window function so I filed https://issues.apache.
>> org/jira/browse/FLINK-4953
>>
>> Thanks for your time.
>>
>> Manu
>>
>> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>> Hmm, I don't completely understand what's going on. Could you maybe post
>> an example, with the trigger code that shows this behaviour?
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>
>> Hi,
>>
>> It's what I'm seeing. If timers are not fired at the end of window, a
>> state (in the window) whose timestamp is *after *the timer will also be
>> emitted. That's a problem for event-time trigger.
>>
>> Thanks,
>> Manu
>>
>>
>> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>> Hi,
>> is that example input/output what you would like to achieve or what you
>> are currently seeing with Flink? I think for your use case a custom Trigger
>> would be required that works like the event-time trigger but additionally
>> registers timers for each element where you want to emit.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>
>> Hi Aljoscha,
>>
>> Thanks for your response.  My use case is to track user trajectory based
>> on page view event when they visit a website.  The input would be like a
>> list of PageView(userId, url, eventTimestamp) with watermarks (=
>> eventTimestamp - duration). I'm trying SessionWindows with some event time
>> trigger. Note we can't wait for the end of session window due to latency.
>> Instead, we want to emit the user trajectories whenever a buffered
>> PageView's event time is passed by watermark. I tried
>> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
>> element's timestamp. For both triggers I've witnessed a problem like the
>> following (e.g. a session gap of 5)
>>
>> PageView("user1", "http://foo";, 1)
>> PageView("user1", "http://foo/bar";, 2)
>> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
>> <http://foo/bar>*", [1,6])
>> PageView("user1", "http://foo/bar/foobar";, 5)
>> Watermark(4) => emit UserTrajectory("user1", "http://foo ->
>> http://foo/bar -> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1,
>> 10])
>>
>> The urls in bold should be included since there could be events before
>> them not arrived yet.
>>
>>
>> Thanks,
>> Manu
>>
>>
>> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>> Hi,
>> with some additional information we might be able to figure this out
>> together. What specific combination of WindowAssigner/Trigger are you using
>> for your example and what is the input stream (including watermarks)?
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>
>> Hi,
>>
>> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
>> which is triggered to emit when watermark passes the timestamp of an
>> element. For example,
>>
>> on watermark(1:01), List(("a", 1:00)) is emitted
>> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
>> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>>
>> It seems that if *("c", 1:06) is processed before watermark(1:04)*
>> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
>> watermark(1:04). This is incorrect since there could be elements with
>> timestamp between 1:04 and 1:06 that have not arrived yet.
>>
>> I guess this is because watermark trigger doesn't check whether element's
>> timestamp has been passed.
>>
>> Please correct me if any of the above is not right.
>>
>> Thanks,
>> Manu Zhang
>>
>>
>>
>>

Reply via email to