Hi Fabian and Chesnay,

Thank you guys.

Fabian : Unfortunately, as Chesnay said, MetricGroup doesn't allow for
ProcessWindowFunction to access to a counter defined in Trigger.
Chesnay : I'm going to follow your advice on how to modify Flink. Thank you
very much!

Best,

- Dongwon

On Thu, Jun 21, 2018 at 10:26 PM, Chesnay Schepler <ches...@apache.org>
wrote:

> Without modifications to Flink? No. By design nothing can intercept or
> retrieve metrics with the metrics API.
> For this pattern the usual recommendation is to explicitly pass the metric
> to components that require it.
>
> If modifications are an option, what you could do is
> * define a counter in the OperatorIOMetricGroup
> * have the operator checkpoint/restore the counter,
> * access it in the trigger by casting your way through the MetricGroups to
> an OperatorMetricGroup from which you can retrieve the
> OperatorIOMetricGroup.
>
>
>
> On 21.06.2018 11:16, Fabian Hueske wrote:
>
> Hi Dongwon,
>
> Yes, the counter state should be stored in operator state which is not
> available on Triggers.
> Chesnay: Can a window function (like ProcessWindowFunction) access (read,
> write) the counter of its associated Trigger to checkpoint and restore it?
>
> Best, Fabian
>
> 2018-06-20 16:59 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>:
>
>> Hi Fabian and Chesnay,
>>
>> As Chesnay pointed out, it seems that I need to write the current counter
>> (which is defined inside Trigger) into state which I think should be the
>> operator state of the window operator.
>> However, as I previously said, TriggerContext allows for users to access
>> only the partitioned state that are scoped to *the key and* *the window* of
>> the current Trigger invocation.
>> There's no way for me to access to the operator state of the window
>> operator through TriggerContext.
>> The partitioned state doesn't seem suitable as we have more than *ten
>> million keys*.
>> This amount of keys could possibly break down the metric system and the
>> external metric systems like Ganglia and Prometheus.
>>
>> What I want the most is to achieve the goal using the current API (I'm
>> using Flink-1.4.2) without modification.
>> But a change in TriggerContext seems unavoidable because it has to expose
>> an additional method for users like me to access to the operator state of
>> the window operator.
>>
>> Thank you guys for the useful discussion.
>>
>> p.s. Fabian, yes you're right. It is Trigger.clear(), not
>> Trigger.onClose().
>>
>> Best,
>> - Dongwon
>>
>>
>> On Wed, Jun 20, 2018 at 7:30 PM, Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> Checkpointing of metrics is a manual process.
>>> The operator must write the current value into state, retrieve it on
>>> restore and restore the counter's count.
>>>
>>>
>>> On 20.06.2018 12:10, Fabian Hueske wrote:
>>>
>>> Hi Dongwon,
>>>
>>> You are of course right! We need to decrement the counter when the
>>> window is closed.
>>>
>>> The idea of using Trigger.clear() (the clean up method is called clear()
>>> instead of onClose()) method is great!
>>> It will be called when the window is closed but also when it is merged.
>>> So, I think you are right and we only need to increment the counter in
>>> Trigger.onElement() and decrement in Trigger.clear().
>>>
>>> I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay
>>> (in CC) would know that.
>>> Not sure what would be the best approach if you need a fault tolerant
>>> solution.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>>
>>> 2018-06-19 16:38 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>> Thanks a lot for your reply.
>>>>
>>>> Do you need to number of active session windows as a DataStream or
>>>>> would you like to have it as a metric that you can expose.
>>>>> I possible, I would recommend to expose it as a metric because they
>>>>> are usually easier to collect.
>>>>
>>>> I want to have it as a metric and it doesn't look difficult thanks to
>>>> the metric system exposed by TriggerContext.
>>>>
>>>> In order to track how many session windows exist, we would need to
>>>>> increment a counter by one when a new window is created (or an element is
>>>>> assigned to a window, which is equivalent for session windows)
>>>>
>>>> I agree with you that we need to increment a counter when
>>>> Trigger.onElement() is called due to the characteristic of session windows.
>>>>
>>>> and decrement the counter when windows are merged by the number of
>>>>> merged windows minus one.
>>>>
>>>> You decrement the counter when windows are merged, but I think we need
>>>> to decrement the counter when a window is expired as well.
>>>>
>>>> However, decrementing the counter is difficult. Although the
>>>>> Trigger.onMerge() method is called, it does not know how many windows were
>>>>> merged (which is done by the WindowAssigner) and only sees the merged
>>>>> window.
>>>>
>>>> We assume that timestamps of records from a user are in ascending
>>>> order, so only one window is closed at a time which simplifies the problem
>>>> of how to decrement the counter.
>>>> Nevertheless, I think I need to decrement the counter in
>>>> Trigger.onClose(), not Trigger.onMerge().
>>>> By doing that in Trigger.onClose(), we can take care of both cases:
>>>> when a window is merged and when a window is expired.
>>>> How do you think about it?
>>>>
>>>> The reason I mention state is to calculate the exact number of active
>>>> sessions even after my Flink application is restarted from checkpoints or
>>>> savepoints.
>>>> If we restore from a savepoint and the counter is initialized to 0,
>>>> we'll see an incorrect value from a dashboard.
>>>> This is the biggest concern of mine at this point.
>>>>
>>>> Best,
>>>>
>>>> - Dongwon
>>>>
>>>>
>>>> On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Dongwon,
>>>>>
>>>>> Do you need to number of active session windows as a DataStream or
>>>>> would you like to have it as a metric that you can expose.
>>>>> I possible, I would recommend to expose it as a metric because they
>>>>> are usually easier to collect.
>>>>>
>>>>> SessionWindows work internally as follows:
>>>>> - every new record is added to a new window that starts at the
>>>>> timestamp of the record and ends at timestamp + gap size. When a record is
>>>>> added to a window, Trigger.onElement() is called.
>>>>> - after a window was created, the session window assigner tries to
>>>>> merge window with overlapping ranges. When windows are merged,
>>>>> Trigger.onMerge() is called.
>>>>>
>>>>> In order to track how many session windows exist, we would need to
>>>>> increment a counter by one when a new window is created (or an element is
>>>>> assigned to a window, which is equivalent for session windows) and
>>>>> decrement the counter when windows are merged by the number of merged
>>>>> windows minus one.
>>>>>
>>>>> Incrementing the counter is rather easy and can be done in
>>>>> Trigger.onElement(), either by using state or a Counter metric (Triggers
>>>>> have access to the metric system).
>>>>> However, decrementing the counter is difficult. Although the
>>>>> Trigger.onMerge() method is called, it does not know how many windows were
>>>>> merged (which is done by the WindowAssigner) and only sees the merged
>>>>> window. There might be a way to maintain state in a Trigger that allows to
>>>>> infer how many windows were merged.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2018-06-16 16:39 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> I'm still eager to expose # of active sessions as a key metric of our
>>>>>> service but I haven’t figured it out yet.
>>>>>>
>>>>>> First of all, I want to ask you some questions regarding your
>>>>>> suggestion.
>>>>>>
>>>>>> You could implement a Trigger that fires when a new window is created
>>>>>> and when the window is closed. A ProcessWindowFunction would emit a +1 if
>>>>>> the window was created and a -1 when the window is closes.
>>>>>> Session windows are a bit special, because you also need to handle
>>>>>> the case of merging windows, i.e., two opened windows can be merged and
>>>>>> only one (the merged) window is closed. So would need to emit a -2 if a
>>>>>> merged window was closes (assuming only two windows were merged).
>>>>>>
>>>>>> Q1)
>>>>>> How to fire when a new window is created and when the window is
>>>>>> closed?
>>>>>> AFAIK, we can return TriggerResult only through the three functions:
>>>>>> onElement, onEventTime, and onProcessingTime.
>>>>>> Q2)
>>>>>> Firing is to emit elements in windows down to the window function,
>>>>>> not emitting values like +1, -1 and -2 which are not in windows.
>>>>>> Or do I miss something that you meant?
>>>>>>
>>>>>> In order to do that, you'd need to carry the merging information
>>>>>> forward. The Trigger.onMerge method cannot trigger the window function, 
>>>>>> but
>>>>>> it could store the merging information in state that is later accessed.
>>>>>>
>>>>>> Q3)
>>>>>> I didn't understand what you mean at all. What do you mean by
>>>>>> carrying the merging information?
>>>>>>
>>>>>> Besides your suggestion, I implemented a custom trigger which is
>>>>>> almost the same as EventTimeTrigger except the followings:
>>>>>> - it maintains a variable to count sessions in an instance of a
>>>>>> window operator
>>>>>> - it increases the variable by 1 when onElement is invoked
>>>>>> - it decreases the variable by 1 when onClose is invoked
>>>>>> Considering the logic of Flink’s session window, it correctly counts
>>>>>> sessions in an instance of a window operator.
>>>>>>
>>>>>> As you might have already noticed, this approach has a critical
>>>>>> problem: there's no way to maintain an operator state inside a
>>>>>> trigger.
>>>>>> TriggerContext only allows to interact with state that is scoped to
>>>>>> the window and the key of the current trigger invocation (as shown in
>>>>>> Trigger#TriggerContext)
>>>>>>
>>>>>> Now I've come to a conclusion that it might not be possible using
>>>>>> DataStream API.
>>>>>> Otherwise, do I need to think in a totally different way to achieve
>>>>>> the goal?
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> - Dongwon
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2018. 2. 20. 오후 6:53, Fabian Hueske <fhue...@gmail.com> 작성:
>>>>>>
>>>>>> Hi Dongwon Kim,
>>>>>>
>>>>>> That's an interesting question.
>>>>>>
>>>>>> I don't have a solution blueprint for you, but a few ideas that
>>>>>> should help to solve the problem.
>>>>>>
>>>>>> I would start with a separate job first and later try to integrate it
>>>>>> with the other job.
>>>>>> You could implement a Trigger that fires when a new window is created
>>>>>> and when the window is closed. A ProcessWindowFunction would emit a +1 if
>>>>>> the window was created and a -1 when the window is closes.
>>>>>> Session windows are a bit special, because you also need to handle
>>>>>> the case of merging windows, i.e., two opened windows can be merged and
>>>>>> only one (the merged) window is closed. So would need to emit a -2 if a
>>>>>> merged window was closes (assuming only two windows were merged).
>>>>>> In order to do that, you'd need to carry the merging information
>>>>>> forward. The Trigger.onMerge method cannot trigger the window function, 
>>>>>> but
>>>>>> it could store the merging information in state that is later accessed.
>>>>>>
>>>>>> Hope this helps,
>>>>>> Fabian
>>>>>>
>>>>>> 2018-02-20 9:54 GMT+01:00 Dongwon Kim <eastcirc...@gmail.com>:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> It could be a totally stupid question but I currently have no idea
>>>>>>> how to get the number of active session windows from a running job.
>>>>>>>
>>>>>>> Our traffic trajectory application (which handles up to 10,000 tps)
>>>>>>> uses event-time session window on KeyedStream (keyed by userID).
>>>>>>>
>>>>>>> Should I write another Flink job for the purpose?
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Dongwon Kim
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>
>

Reply via email to