Hi Dongwon, Yes, the counter state should be stored in operator state which is not available on Triggers. Chesnay: Can a window function (like ProcessWindowFunction) access (read, write) the counter of its associated Trigger to checkpoint and restore it?
Best, Fabian 2018-06-20 16:59 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>: > Hi Fabian and Chesnay, > > As Chesnay pointed out, it seems that I need to write the current counter > (which is defined inside Trigger) into state which I think should be the > operator state of the window operator. > However, as I previously said, TriggerContext allows for users to access > only the partitioned state that are scoped to *the key and* *the window* of > the current Trigger invocation. > There's no way for me to access to the operator state of the window > operator through TriggerContext. > The partitioned state doesn't seem suitable as we have more than *ten > million keys*. > This amount of keys could possibly break down the metric system and the > external metric systems like Ganglia and Prometheus. > > What I want the most is to achieve the goal using the current API (I'm > using Flink-1.4.2) without modification. > But a change in TriggerContext seems unavoidable because it has to expose > an additional method for users like me to access to the operator state of > the window operator. > > Thank you guys for the useful discussion. > > p.s. Fabian, yes you're right. It is Trigger.clear(), not > Trigger.onClose(). > > Best, > - Dongwon > > > On Wed, Jun 20, 2018 at 7:30 PM, Chesnay Schepler <ches...@apache.org> > wrote: > >> Checkpointing of metrics is a manual process. >> The operator must write the current value into state, retrieve it on >> restore and restore the counter's count. >> >> >> On 20.06.2018 12:10, Fabian Hueske wrote: >> >> Hi Dongwon, >> >> You are of course right! We need to decrement the counter when the window >> is closed. >> >> The idea of using Trigger.clear() (the clean up method is called clear() >> instead of onClose()) method is great! >> It will be called when the window is closed but also when it is merged. >> So, I think you are right and we only need to increment the counter in >> Trigger.onElement() and decrement in Trigger.clear(). >> >> I'm not 100% sure, but I doubt that metrics can checkpointed. Chesnay (in >> CC) would know that. >> Not sure what would be the best approach if you need a fault tolerant >> solution. >> >> Best, Fabian >> >> >> >> >> 2018-06-19 16:38 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>: >> >>> Hi Fabian, >>> Thanks a lot for your reply. >>> >>> Do you need to number of active session windows as a DataStream or would >>>> you like to have it as a metric that you can expose. >>>> I possible, I would recommend to expose it as a metric because they are >>>> usually easier to collect. >>> >>> I want to have it as a metric and it doesn't look difficult thanks to >>> the metric system exposed by TriggerContext. >>> >>> In order to track how many session windows exist, we would need to >>>> increment a counter by one when a new window is created (or an element is >>>> assigned to a window, which is equivalent for session windows) >>> >>> I agree with you that we need to increment a counter when >>> Trigger.onElement() is called due to the characteristic of session windows. >>> >>> and decrement the counter when windows are merged by the number of >>>> merged windows minus one. >>> >>> You decrement the counter when windows are merged, but I think we need >>> to decrement the counter when a window is expired as well. >>> >>> However, decrementing the counter is difficult. Although the >>>> Trigger.onMerge() method is called, it does not know how many windows were >>>> merged (which is done by the WindowAssigner) and only sees the merged >>>> window. >>> >>> We assume that timestamps of records from a user are in ascending order, >>> so only one window is closed at a time which simplifies the problem of how >>> to decrement the counter. >>> Nevertheless, I think I need to decrement the counter in >>> Trigger.onClose(), not Trigger.onMerge(). >>> By doing that in Trigger.onClose(), we can take care of both cases: when >>> a window is merged and when a window is expired. >>> How do you think about it? >>> >>> The reason I mention state is to calculate the exact number of active >>> sessions even after my Flink application is restarted from checkpoints or >>> savepoints. >>> If we restore from a savepoint and the counter is initialized to 0, >>> we'll see an incorrect value from a dashboard. >>> This is the biggest concern of mine at this point. >>> >>> Best, >>> >>> - Dongwon >>> >>> >>> On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Dongwon, >>>> >>>> Do you need to number of active session windows as a DataStream or >>>> would you like to have it as a metric that you can expose. >>>> I possible, I would recommend to expose it as a metric because they are >>>> usually easier to collect. >>>> >>>> SessionWindows work internally as follows: >>>> - every new record is added to a new window that starts at the >>>> timestamp of the record and ends at timestamp + gap size. When a record is >>>> added to a window, Trigger.onElement() is called. >>>> - after a window was created, the session window assigner tries to >>>> merge window with overlapping ranges. When windows are merged, >>>> Trigger.onMerge() is called. >>>> >>>> In order to track how many session windows exist, we would need to >>>> increment a counter by one when a new window is created (or an element is >>>> assigned to a window, which is equivalent for session windows) and >>>> decrement the counter when windows are merged by the number of merged >>>> windows minus one. >>>> >>>> Incrementing the counter is rather easy and can be done in >>>> Trigger.onElement(), either by using state or a Counter metric (Triggers >>>> have access to the metric system). >>>> However, decrementing the counter is difficult. Although the >>>> Trigger.onMerge() method is called, it does not know how many windows were >>>> merged (which is done by the WindowAssigner) and only sees the merged >>>> window. There might be a way to maintain state in a Trigger that allows to >>>> infer how many windows were merged. >>>> >>>> Best, Fabian >>>> >>>> 2018-06-16 16:39 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>: >>>> >>>>> Hi Fabian, >>>>> >>>>> I'm still eager to expose # of active sessions as a key metric of our >>>>> service but I haven’t figured it out yet. >>>>> >>>>> First of all, I want to ask you some questions regarding your >>>>> suggestion. >>>>> >>>>> You could implement a Trigger that fires when a new window is created >>>>> and when the window is closed. A ProcessWindowFunction would emit a +1 if >>>>> the window was created and a -1 when the window is closes. >>>>> Session windows are a bit special, because you also need to handle the >>>>> case of merging windows, i.e., two opened windows can be merged and only >>>>> one (the merged) window is closed. So would need to emit a -2 if a merged >>>>> window was closes (assuming only two windows were merged). >>>>> >>>>> Q1) >>>>> How to fire when a new window is created and when the window is closed? >>>>> AFAIK, we can return TriggerResult only through the three functions: >>>>> onElement, onEventTime, and onProcessingTime. >>>>> Q2) >>>>> Firing is to emit elements in windows down to the window function, not >>>>> emitting values like +1, -1 and -2 which are not in windows. >>>>> Or do I miss something that you meant? >>>>> >>>>> In order to do that, you'd need to carry the merging information >>>>> forward. The Trigger.onMerge method cannot trigger the window function, >>>>> but >>>>> it could store the merging information in state that is later accessed. >>>>> >>>>> Q3) >>>>> I didn't understand what you mean at all. What do you mean by carrying >>>>> the merging information? >>>>> >>>>> Besides your suggestion, I implemented a custom trigger which is >>>>> almost the same as EventTimeTrigger except the followings: >>>>> - it maintains a variable to count sessions in an instance of a window >>>>> operator >>>>> - it increases the variable by 1 when onElement is invoked >>>>> - it decreases the variable by 1 when onClose is invoked >>>>> Considering the logic of Flink’s session window, it correctly counts >>>>> sessions in an instance of a window operator. >>>>> >>>>> As you might have already noticed, this approach has a critical >>>>> problem: there's no way to maintain an operator state inside a >>>>> trigger. >>>>> TriggerContext only allows to interact with state that is scoped to >>>>> the window and the key of the current trigger invocation (as shown in >>>>> Trigger#TriggerContext) >>>>> >>>>> Now I've come to a conclusion that it might not be possible using >>>>> DataStream API. >>>>> Otherwise, do I need to think in a totally different way to achieve >>>>> the goal? >>>>> >>>>> Best, >>>>> >>>>> - Dongwon >>>>> >>>>> >>>>> >>>>> 2018. 2. 20. 오후 6:53, Fabian Hueske <fhue...@gmail.com> 작성: >>>>> >>>>> Hi Dongwon Kim, >>>>> >>>>> That's an interesting question. >>>>> >>>>> I don't have a solution blueprint for you, but a few ideas that should >>>>> help to solve the problem. >>>>> >>>>> I would start with a separate job first and later try to integrate it >>>>> with the other job. >>>>> You could implement a Trigger that fires when a new window is created >>>>> and when the window is closed. A ProcessWindowFunction would emit a +1 if >>>>> the window was created and a -1 when the window is closes. >>>>> Session windows are a bit special, because you also need to handle the >>>>> case of merging windows, i.e., two opened windows can be merged and only >>>>> one (the merged) window is closed. So would need to emit a -2 if a merged >>>>> window was closes (assuming only two windows were merged). >>>>> In order to do that, you'd need to carry the merging information >>>>> forward. The Trigger.onMerge method cannot trigger the window function, >>>>> but >>>>> it could store the merging information in state that is later accessed. >>>>> >>>>> Hope this helps, >>>>> Fabian >>>>> >>>>> 2018-02-20 9:54 GMT+01:00 Dongwon Kim <eastcirc...@gmail.com>: >>>>> >>>>>> Hi, >>>>>> >>>>>> It could be a totally stupid question but I currently have no idea >>>>>> how to get the number of active session windows from a running job. >>>>>> >>>>>> Our traffic trajectory application (which handles up to 10,000 tps) >>>>>> uses event-time session window on KeyedStream (keyed by userID). >>>>>> >>>>>> Should I write another Flink job for the purpose? >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Dongwon Kim >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >> >