Hi Fabian,
Thanks a lot for your reply.

Do you need to number of active session windows as a DataStream or would
> you like to have it as a metric that you can expose.
> I possible, I would recommend to expose it as a metric because they are
> usually easier to collect.

I want to have it as a metric and it doesn't look difficult thanks to the
metric system exposed by TriggerContext.

In order to track how many session windows exist, we would need to
> increment a counter by one when a new window is created (or an element is
> assigned to a window, which is equivalent for session windows)

I agree with you that we need to increment a counter when
Trigger.onElement() is called due to the characteristic of session windows.

and decrement the counter when windows are merged by the number of merged
> windows minus one.

You decrement the counter when windows are merged, but I think we need to
decrement the counter when a window is expired as well.

However, decrementing the counter is difficult. Although the
> Trigger.onMerge() method is called, it does not know how many windows were
> merged (which is done by the WindowAssigner) and only sees the merged
> window.

We assume that timestamps of records from a user are in ascending order, so
only one window is closed at a time which simplifies the problem of how to
decrement the counter.
Nevertheless, I think I need to decrement the counter in Trigger.onClose(),
not Trigger.onMerge().
By doing that in Trigger.onClose(), we can take care of both cases: when a
window is merged and when a window is expired.
How do you think about it?

The reason I mention state is to calculate the exact number of active
sessions even after my Flink application is restarted from checkpoints or
savepoints.
If we restore from a savepoint and the counter is initialized to 0, we'll
see an incorrect value from a dashboard.
This is the biggest concern of mine at this point.

Best,

- Dongwon


On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Dongwon,
>
> Do you need to number of active session windows as a DataStream or would
> you like to have it as a metric that you can expose.
> I possible, I would recommend to expose it as a metric because they are
> usually easier to collect.
>
> SessionWindows work internally as follows:
> - every new record is added to a new window that starts at the timestamp
> of the record and ends at timestamp + gap size. When a record is added to a
> window, Trigger.onElement() is called.
> - after a window was created, the session window assigner tries to merge
> window with overlapping ranges. When windows are merged, Trigger.onMerge()
> is called.
>
> In order to track how many session windows exist, we would need to
> increment a counter by one when a new window is created (or an element is
> assigned to a window, which is equivalent for session windows) and
> decrement the counter when windows are merged by the number of merged
> windows minus one.
>
> Incrementing the counter is rather easy and can be done in
> Trigger.onElement(), either by using state or a Counter metric (Triggers
> have access to the metric system).
> However, decrementing the counter is difficult. Although the
> Trigger.onMerge() method is called, it does not know how many windows were
> merged (which is done by the WindowAssigner) and only sees the merged
> window. There might be a way to maintain state in a Trigger that allows to
> infer how many windows were merged.
>
> Best, Fabian
>
> 2018-06-16 16:39 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>:
>
>> Hi Fabian,
>>
>> I'm still eager to expose # of active sessions as a key metric of our
>> service but I haven’t figured it out yet.
>>
>> First of all, I want to ask you some questions regarding your suggestion.
>>
>> You could implement a Trigger that fires when a new window is created and
>> when the window is closed. A ProcessWindowFunction would emit a +1 if the
>> window was created and a -1 when the window is closes.
>> Session windows are a bit special, because you also need to handle the
>> case of merging windows, i.e., two opened windows can be merged and only
>> one (the merged) window is closed. So would need to emit a -2 if a merged
>> window was closes (assuming only two windows were merged).
>>
>> Q1)
>> How to fire when a new window is created and when the window is closed?
>> AFAIK, we can return TriggerResult only through the three functions:
>> onElement, onEventTime, and onProcessingTime.
>> Q2)
>> Firing is to emit elements in windows down to the window function, not
>> emitting values like +1, -1 and -2 which are not in windows.
>> Or do I miss something that you meant?
>>
>> In order to do that, you'd need to carry the merging information forward.
>> The Trigger.onMerge method cannot trigger the window function, but it could
>> store the merging information in state that is later accessed.
>>
>> Q3)
>> I didn't understand what you mean at all. What do you mean by carrying
>> the merging information?
>>
>> Besides your suggestion, I implemented a custom trigger which is almost
>> the same as EventTimeTrigger except the followings:
>> - it maintains a variable to count sessions in an instance of a window
>> operator
>> - it increases the variable by 1 when onElement is invoked
>> - it decreases the variable by 1 when onClose is invoked
>> Considering the logic of Flink’s session window, it correctly counts
>> sessions in an instance of a window operator.
>>
>> As you might have already noticed, this approach has a critical problem: 
>> there's
>> no way to maintain an operator state inside a trigger.
>> TriggerContext only allows to interact with state that is scoped to the
>> window and the key of the current trigger invocation (as shown in
>> Trigger#TriggerContext)
>>
>> Now I've come to a conclusion that it might not be possible using
>> DataStream API.
>> Otherwise, do I need to think in a totally different way to achieve the
>> goal?
>>
>> Best,
>>
>> - Dongwon
>>
>>
>>
>> 2018. 2. 20. 오후 6:53, Fabian Hueske <fhue...@gmail.com> 작성:
>>
>> Hi Dongwon Kim,
>>
>> That's an interesting question.
>>
>> I don't have a solution blueprint for you, but a few ideas that should
>> help to solve the problem.
>>
>> I would start with a separate job first and later try to integrate it
>> with the other job.
>> You could implement a Trigger that fires when a new window is created and
>> when the window is closed. A ProcessWindowFunction would emit a +1 if the
>> window was created and a -1 when the window is closes.
>> Session windows are a bit special, because you also need to handle the
>> case of merging windows, i.e., two opened windows can be merged and only
>> one (the merged) window is closed. So would need to emit a -2 if a merged
>> window was closes (assuming only two windows were merged).
>> In order to do that, you'd need to carry the merging information forward.
>> The Trigger.onMerge method cannot trigger the window function, but it could
>> store the merging information in state that is later accessed.
>>
>> Hope this helps,
>> Fabian
>>
>> 2018-02-20 9:54 GMT+01:00 Dongwon Kim <eastcirc...@gmail.com>:
>>
>>> Hi,
>>>
>>> It could be a totally stupid question but I currently have no idea how
>>> to get the number of active session windows from a running job.
>>>
>>> Our traffic trajectory application (which handles up to 10,000 tps) uses
>>> event-time session window on KeyedStream (keyed by userID).
>>>
>>> Should I write another Flink job for the purpose?
>>>
>>> Cheers,
>>>
>>> Dongwon Kim
>>
>>
>>
>>
>

Reply via email to