Hi Dongwon,

Yes, the counter state should be stored in operator state which is not
available on Triggers.
Chesnay: Can a window function (like ProcessWindowFunction) access (read,
write) the counter of its associated Trigger to checkpoint and restore it?

Best, Fabian

2018-06-20 16:59 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>:

> Hi Fabian and Chesnay,
>
> As Chesnay pointed out, it seems that I need to write the current counter
> (which is defined inside Trigger) into state which I think should be the
> operator state of the window operator.
> However, as I previously said, TriggerContext allows for users to access
> only the partitioned state that are scoped to *the key and* *the window* of
> the current Trigger invocation.
> There's no way for me to access to the operator state of the window
> operator through TriggerContext.
> The partitioned state doesn't seem suitable as we have more than *ten
> million keys*.
> This amount of keys could possibly break down the metric system and the
> external metric systems like Ganglia and Prometheus.
>
> What I want the most is to achieve the goal using the current API (I'm
> using Flink-1.4.2) without modification.
> But a change in TriggerContext seems unavoidable because it has to expose
> an additional method for users like me to access to the operator state of
> the window operator.
>
> Thank you guys for the useful discussion.
>
> p.s. Fabian, yes you're right. It is Trigger.clear(), not
> Trigger.onClose().
>
> Best,
> - Dongwon
>
>
> On Wed, Jun 20, 2018 at 7:30 PM, Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> Checkpointing of metrics is a manual process.
>> The operator must write the current value into state, retrieve it on
>> restore and restore the counter's count.
>>
>>
>> On 20.06.2018 12:10, Fabian Hueske wrote:
>>
>> Hi Dongwon,
>>
>> You are of course right! We need to decrement the counter when the window
>> is closed.
>>
>> The idea of using Trigger.clear() (the clean up method is called clear()
>> instead of onClose()) method is great!
>> It will be called when the window is closed but also when it is merged.
>> So, I think you are right and we only need to increment the counter in
>> Trigger.onElement() and decrement in Trigger.clear().
>>
>> I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay (in
>> CC) would know that.
>> Not sure what would be the best approach if you need a fault tolerant
>> solution.
>>
>> Best, Fabian
>>
>>
>>
>>
>> 2018-06-19 16:38 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>:
>>
>>> Hi Fabian,
>>> Thanks a lot for your reply.
>>>
>>> Do you need to number of active session windows as a DataStream or would
>>>> you like to have it as a metric that you can expose.
>>>> I possible, I would recommend to expose it as a metric because they are
>>>> usually easier to collect.
>>>
>>> I want to have it as a metric and it doesn't look difficult thanks to
>>> the metric system exposed by TriggerContext.
>>>
>>> In order to track how many session windows exist, we would need to
>>>> increment a counter by one when a new window is created (or an element is
>>>> assigned to a window, which is equivalent for session windows)
>>>
>>> I agree with you that we need to increment a counter when
>>> Trigger.onElement() is called due to the characteristic of session windows.
>>>
>>> and decrement the counter when windows are merged by the number of
>>>> merged windows minus one.
>>>
>>> You decrement the counter when windows are merged, but I think we need
>>> to decrement the counter when a window is expired as well.
>>>
>>> However, decrementing the counter is difficult. Although the
>>>> Trigger.onMerge() method is called, it does not know how many windows were
>>>> merged (which is done by the WindowAssigner) and only sees the merged
>>>> window.
>>>
>>> We assume that timestamps of records from a user are in ascending order,
>>> so only one window is closed at a time which simplifies the problem of how
>>> to decrement the counter.
>>> Nevertheless, I think I need to decrement the counter in
>>> Trigger.onClose(), not Trigger.onMerge().
>>> By doing that in Trigger.onClose(), we can take care of both cases: when
>>> a window is merged and when a window is expired.
>>> How do you think about it?
>>>
>>> The reason I mention state is to calculate the exact number of active
>>> sessions even after my Flink application is restarted from checkpoints or
>>> savepoints.
>>> If we restore from a savepoint and the counter is initialized to 0,
>>> we'll see an incorrect value from a dashboard.
>>> This is the biggest concern of mine at this point.
>>>
>>> Best,
>>>
>>> - Dongwon
>>>
>>>
>>> On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Dongwon,
>>>>
>>>> Do you need to number of active session windows as a DataStream or
>>>> would you like to have it as a metric that you can expose.
>>>> I possible, I would recommend to expose it as a metric because they are
>>>> usually easier to collect.
>>>>
>>>> SessionWindows work internally as follows:
>>>> - every new record is added to a new window that starts at the
>>>> timestamp of the record and ends at timestamp + gap size. When a record is
>>>> added to a window, Trigger.onElement() is called.
>>>> - after a window was created, the session window assigner tries to
>>>> merge window with overlapping ranges. When windows are merged,
>>>> Trigger.onMerge() is called.
>>>>
>>>> In order to track how many session windows exist, we would need to
>>>> increment a counter by one when a new window is created (or an element is
>>>> assigned to a window, which is equivalent for session windows) and
>>>> decrement the counter when windows are merged by the number of merged
>>>> windows minus one.
>>>>
>>>> Incrementing the counter is rather easy and can be done in
>>>> Trigger.onElement(), either by using state or a Counter metric (Triggers
>>>> have access to the metric system).
>>>> However, decrementing the counter is difficult. Although the
>>>> Trigger.onMerge() method is called, it does not know how many windows were
>>>> merged (which is done by the WindowAssigner) and only sees the merged
>>>> window. There might be a way to maintain state in a Trigger that allows to
>>>> infer how many windows were merged.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-06-16 16:39 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> I'm still eager to expose # of active sessions as a key metric of our
>>>>> service but I haven’t figured it out yet.
>>>>>
>>>>> First of all, I want to ask you some questions regarding your
>>>>> suggestion.
>>>>>
>>>>> You could implement a Trigger that fires when a new window is created
>>>>> and when the window is closed. A ProcessWindowFunction would emit a +1 if
>>>>> the window was created and a -1 when the window is closes.
>>>>> Session windows are a bit special, because you also need to handle the
>>>>> case of merging windows, i.e., two opened windows can be merged and only
>>>>> one (the merged) window is closed. So would need to emit a -2 if a merged
>>>>> window was closes (assuming only two windows were merged).
>>>>>
>>>>> Q1)
>>>>> How to fire when a new window is created and when the window is closed?
>>>>> AFAIK, we can return TriggerResult only through the three functions:
>>>>> onElement, onEventTime, and onProcessingTime.
>>>>> Q2)
>>>>> Firing is to emit elements in windows down to the window function, not
>>>>> emitting values like +1, -1 and -2 which are not in windows.
>>>>> Or do I miss something that you meant?
>>>>>
>>>>> In order to do that, you'd need to carry the merging information
>>>>> forward. The Trigger.onMerge method cannot trigger the window function, 
>>>>> but
>>>>> it could store the merging information in state that is later accessed.
>>>>>
>>>>> Q3)
>>>>> I didn't understand what you mean at all. What do you mean by carrying
>>>>> the merging information?
>>>>>
>>>>> Besides your suggestion, I implemented a custom trigger which is
>>>>> almost the same as EventTimeTrigger except the followings:
>>>>> - it maintains a variable to count sessions in an instance of a window
>>>>> operator
>>>>> - it increases the variable by 1 when onElement is invoked
>>>>> - it decreases the variable by 1 when onClose is invoked
>>>>> Considering the logic of Flink’s session window, it correctly counts
>>>>> sessions in an instance of a window operator.
>>>>>
>>>>> As you might have already noticed, this approach has a critical
>>>>> problem: there's no way to maintain an operator state inside a
>>>>> trigger.
>>>>> TriggerContext only allows to interact with state that is scoped to
>>>>> the window and the key of the current trigger invocation (as shown in
>>>>> Trigger#TriggerContext)
>>>>>
>>>>> Now I've come to a conclusion that it might not be possible using
>>>>> DataStream API.
>>>>> Otherwise, do I need to think in a totally different way to achieve
>>>>> the goal?
>>>>>
>>>>> Best,
>>>>>
>>>>> - Dongwon
>>>>>
>>>>>
>>>>>
>>>>> 2018. 2. 20. 오후 6:53, Fabian Hueske <fhue...@gmail.com> 작성:
>>>>>
>>>>> Hi Dongwon Kim,
>>>>>
>>>>> That's an interesting question.
>>>>>
>>>>> I don't have a solution blueprint for you, but a few ideas that should
>>>>> help to solve the problem.
>>>>>
>>>>> I would start with a separate job first and later try to integrate it
>>>>> with the other job.
>>>>> You could implement a Trigger that fires when a new window is created
>>>>> and when the window is closed. A ProcessWindowFunction would emit a +1 if
>>>>> the window was created and a -1 when the window is closes.
>>>>> Session windows are a bit special, because you also need to handle the
>>>>> case of merging windows, i.e., two opened windows can be merged and only
>>>>> one (the merged) window is closed. So would need to emit a -2 if a merged
>>>>> window was closes (assuming only two windows were merged).
>>>>> In order to do that, you'd need to carry the merging information
>>>>> forward. The Trigger.onMerge method cannot trigger the window function, 
>>>>> but
>>>>> it could store the merging information in state that is later accessed.
>>>>>
>>>>> Hope this helps,
>>>>> Fabian
>>>>>
>>>>> 2018-02-20 9:54 GMT+01:00 Dongwon Kim <eastcirc...@gmail.com>:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> It could be a totally stupid question but I currently have no idea
>>>>>> how to get the number of active session windows from a running job.
>>>>>>
>>>>>> Our traffic trajectory application (which handles up to 10,000 tps)
>>>>>> uses event-time session window on KeyedStream (keyed by userID).
>>>>>>
>>>>>> Should I write another Flink job for the purpose?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Dongwon Kim
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Reply via email to