Hi, why are you keying by the source ID and not by the user ID? Cheers, Aljoscha
On Mon, 7 Nov 2016 at 15:42 Till Rohrmann <trohrm...@apache.org> wrote: > Hi Samir, > > the windowing API in Flink works the following way: First an incoming > element is assigned to a window. This is defined in the window clause where > you create a GlobalWindow. Thus, all elements for the same sourceId will be > assigned to the same window. Next, the element is given to a Trigger which > decides whether the window shall be evaluated or not. But at this point the > element is already part of the window. That's why the last element of your > window has a different ID. > > What you could try to use is the MergingWindowAssigner to create windows > whose elements all have the same ID. There you assign all elements with the > same ID to the same session window. The session windows are then triggered > by event time, for example. That's the recommended way to create session > windows with Flink. Here is some documentation for session windows [1]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#session-windows > > Cheers, > Till > > On Sun, Nov 6, 2016 at 12:11 PM, Samir Abdou < > abdou.samir.mail...@gmail.com> wrote: > > I am using Flink 1.2-Snapshot. My data looks like the following: > > - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919 > - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920 > - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944 > - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944 > - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955 > - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955 > - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955 > - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960 > - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000 > > I am running the following code to create windows based user IDs: > > stream.flatMap(new LogsParser()) > .assignTimestampsAndWatermarks(new MessageTimestampExtractor()) > .keyBy("sourceId") > .window(GlobalWindows.create()) > .trigger(PurgingTrigger.of(new MySessionTrigger())) > .apply(new SessionWindowFunction()) > .print(); > > MySession trigger looks into the received event and check the user ID to > trigger the window on user ID changes. The SessionWindowFunction just > create a session out of the window. > > Here are the sessions created: > > 1. > > Session: > - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919 > - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, > value=920 > - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, > value=944 > - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, > value=944 > 2. > > Session: > - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955 > - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, > value=955 > - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, > value=955 > - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, > value=960 > 3. > > Session: > - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000 > > The problem as you can see is that in every session the last event belongs > actually to the next window. The decision to trigger the window is somehow > late as the last event is already in the window. > > How can I trigger the window without considering the last event in that > window? > > Thanks for your help. > > >