Operator's with multiple inputs emit the minimum of the input's watermarks
downstream. In case of a keyBy this means that the watermark is sent to all
downstream consumers.

Cheers,
Till

On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson <pad...@gmail.com> wrote:

> Just to add: by printing intermediate results I see that I definitely have
> more than five minutes of data, and by windowing without the session
> windows I see that event time watermarks do seem to be generated as
> expected.
>
> Thanks for your help and time.
>
> Padarn
>
> On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <pad...@gmail.com> wrote:
>
>> Hi Till,
>>
>> I will work on an example, but I’m a little confused by how keyBy and
>> watermarks work in this case. This documentation says (
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
>> ):
>>
>>
>> Some operators consume multiple input streams; a union, for example, or
>> operators following a *keyBy(…)*or *partition(…)* function. Such an
>> operator’s current event time is the minimum of its input streams’ event
>> times. As its input streams update their event times, so does the operator.
>>
>>
>> This implies to me that the keyBy splits the watermark?
>>
>> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Padarn,
>>>
>>> Flink does not generate watermarks per keys. Atm watermarks are always
>>> global. Therefore, I would suspect that it is rather a problem with
>>> generating watermarks at all. Could it be that your input data does not
>>> span a period longer than 5 minutes and also does not terminate? Another
>>> problem could be the CountTrigger which should not react to the window's
>>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and
>>> I think this will cause the window to not fire. Maybe a working example
>>> program with example input could be helpful for further debugging.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pad...@gmail.com> wrote:
>>>
>>>> Hi Flink Mailing List,
>>>>
>>>> Long story short - I want to somehow collapse watermarks at an operator
>>>> across keys, so that keys with dragging watermarks do not drag behind.
>>>> Details below:
>>>>
>>>> ---
>>>>
>>>> I have an application in which I want to perform the follow sequence of
>>>> steps: Assume my data is made up of data that has: (time, user, location,
>>>> action)
>>>>
>>>> -> Read source
>>>> -> KeyBy (UserId, Location)
>>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location
>>>> Session)
>>>> -> TriggerOnFirst event
>>>> -> KeyBy (Location)
>>>> -> SlidingEventTimeWindow(5min length, 5 second gap)
>>>> -> Count
>>>>
>>>> The end intention is to count the number of unique users in a given
>>>> location - the EventTimeSessionWindow is used to make sure users are only
>>>> counted once.
>>>>
>>>> So I created a custom Trigger, which is the same as CountTrigger, but
>>>> has the following `TriggerResult" funtion:
>>>>
>>>> @Override
>>>> public TriggerResult onElement(Object element, long timestamp, W window, 
>>>> TriggerContext ctx) throws Exception {
>>>>   ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
>>>>   count.add(1L);
>>>>   if (count.get() == maxCount) {
>>>>     return TriggerResult.FIRE_AND_PURGE;
>>>>   } else if (count.get() > maxCount) {
>>>>     return TriggerResult.PURGE;
>>>>   }
>>>>   return TriggerResult.CONTINUE;
>>>>
>>>> }
>>>>
>>>> But my final SlidingEventTimeWindow does not fire properly. This is
>>>> because (I assume) there are some users with sessions windows that are not
>>>> closed, and so the watermark for those keys is running behind and so the
>>>> SlidingEventTimeWindow watermark is held back too.
>>>>
>>>> What I feel like I want to achieve is essentially setting the watermark
>>>> of the SlidingEventTimeWindow operator to be the maximum (with lateness) of
>>>> the input keys, rather than the minimum, but I cannot tell if this is
>>>> possible, and if not, what another approach could be.
>>>>
>>>> Thanks,
>>>> Padarn
>>>>
>>>

Reply via email to