Hi Fabian, Thanks a lot for your reply. Do you need to number of active session windows as a DataStream or would > you like to have it as a metric that you can expose. > I possible, I would recommend to expose it as a metric because they are > usually easier to collect.
I want to have it as a metric and it doesn't look difficult thanks to the metric system exposed by TriggerContext. In order to track how many session windows exist, we would need to > increment a counter by one when a new window is created (or an element is > assigned to a window, which is equivalent for session windows) I agree with you that we need to increment a counter when Trigger.onElement() is called due to the characteristic of session windows. and decrement the counter when windows are merged by the number of merged > windows minus one. You decrement the counter when windows are merged, but I think we need to decrement the counter when a window is expired as well. However, decrementing the counter is difficult. Although the > Trigger.onMerge() method is called, it does not know how many windows were > merged (which is done by the WindowAssigner) and only sees the merged > window. We assume that timestamps of records from a user are in ascending order, so only one window is closed at a time which simplifies the problem of how to decrement the counter. Nevertheless, I think I need to decrement the counter in Trigger.onClose(), not Trigger.onMerge(). By doing that in Trigger.onClose(), we can take care of both cases: when a window is merged and when a window is expired. How do you think about it? The reason I mention state is to calculate the exact number of active sessions even after my Flink application is restarted from checkpoints or savepoints. If we restore from a savepoint and the counter is initialized to 0, we'll see an incorrect value from a dashboard. This is the biggest concern of mine at this point. Best, - Dongwon On Tue, Jun 19, 2018 at 7:14 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Dongwon, > > Do you need to number of active session windows as a DataStream or would > you like to have it as a metric that you can expose. > I possible, I would recommend to expose it as a metric because they are > usually easier to collect. > > SessionWindows work internally as follows: > - every new record is added to a new window that starts at the timestamp > of the record and ends at timestamp + gap size. When a record is added to a > window, Trigger.onElement() is called. > - after a window was created, the session window assigner tries to merge > window with overlapping ranges. When windows are merged, Trigger.onMerge() > is called. > > In order to track how many session windows exist, we would need to > increment a counter by one when a new window is created (or an element is > assigned to a window, which is equivalent for session windows) and > decrement the counter when windows are merged by the number of merged > windows minus one. > > Incrementing the counter is rather easy and can be done in > Trigger.onElement(), either by using state or a Counter metric (Triggers > have access to the metric system). > However, decrementing the counter is difficult. Although the > Trigger.onMerge() method is called, it does not know how many windows were > merged (which is done by the WindowAssigner) and only sees the merged > window. There might be a way to maintain state in a Trigger that allows to > infer how many windows were merged. > > Best, Fabian > > 2018-06-16 16:39 GMT+02:00 Dongwon Kim <eastcirc...@gmail.com>: > >> Hi Fabian, >> >> I'm still eager to expose # of active sessions as a key metric of our >> service but I haven’t figured it out yet. >> >> First of all, I want to ask you some questions regarding your suggestion. >> >> You could implement a Trigger that fires when a new window is created and >> when the window is closed. A ProcessWindowFunction would emit a +1 if the >> window was created and a -1 when the window is closes. >> Session windows are a bit special, because you also need to handle the >> case of merging windows, i.e., two opened windows can be merged and only >> one (the merged) window is closed. So would need to emit a -2 if a merged >> window was closes (assuming only two windows were merged). >> >> Q1) >> How to fire when a new window is created and when the window is closed? >> AFAIK, we can return TriggerResult only through the three functions: >> onElement, onEventTime, and onProcessingTime. >> Q2) >> Firing is to emit elements in windows down to the window function, not >> emitting values like +1, -1 and -2 which are not in windows. >> Or do I miss something that you meant? >> >> In order to do that, you'd need to carry the merging information forward. >> The Trigger.onMerge method cannot trigger the window function, but it could >> store the merging information in state that is later accessed. >> >> Q3) >> I didn't understand what you mean at all. What do you mean by carrying >> the merging information? >> >> Besides your suggestion, I implemented a custom trigger which is almost >> the same as EventTimeTrigger except the followings: >> - it maintains a variable to count sessions in an instance of a window >> operator >> - it increases the variable by 1 when onElement is invoked >> - it decreases the variable by 1 when onClose is invoked >> Considering the logic of Flink’s session window, it correctly counts >> sessions in an instance of a window operator. >> >> As you might have already noticed, this approach has a critical problem: >> there's >> no way to maintain an operator state inside a trigger. >> TriggerContext only allows to interact with state that is scoped to the >> window and the key of the current trigger invocation (as shown in >> Trigger#TriggerContext) >> >> Now I've come to a conclusion that it might not be possible using >> DataStream API. >> Otherwise, do I need to think in a totally different way to achieve the >> goal? >> >> Best, >> >> - Dongwon >> >> >> >> 2018. 2. 20. 오후 6:53, Fabian Hueske <fhue...@gmail.com> 작성: >> >> Hi Dongwon Kim, >> >> That's an interesting question. >> >> I don't have a solution blueprint for you, but a few ideas that should >> help to solve the problem. >> >> I would start with a separate job first and later try to integrate it >> with the other job. >> You could implement a Trigger that fires when a new window is created and >> when the window is closed. A ProcessWindowFunction would emit a +1 if the >> window was created and a -1 when the window is closes. >> Session windows are a bit special, because you also need to handle the >> case of merging windows, i.e., two opened windows can be merged and only >> one (the merged) window is closed. So would need to emit a -2 if a merged >> window was closes (assuming only two windows were merged). >> In order to do that, you'd need to carry the merging information forward. >> The Trigger.onMerge method cannot trigger the window function, but it could >> store the merging information in state that is later accessed. >> >> Hope this helps, >> Fabian >> >> 2018-02-20 9:54 GMT+01:00 Dongwon Kim <eastcirc...@gmail.com>: >> >>> Hi, >>> >>> It could be a totally stupid question but I currently have no idea how >>> to get the number of active session windows from a running job. >>> >>> Our traffic trajectory application (which handles up to 10,000 tps) uses >>> event-time session window on KeyedStream (keyed by userID). >>> >>> Should I write another Flink job for the purpose? >>> >>> Cheers, >>> >>> Dongwon Kim >> >> >> >> >