[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942231#comment-15942231 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 Hehe, thanks! > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Seth Wiesman > Fix For: 1.3.0 > > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941791#comment-15941791 ] ASF GitHub Bot commented on FLINK-5929: --- Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 Done! Thank you for for helping me get this feature merged in. This has to be one of the most painless commits I've ever made to an open source project of this size. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Seth Wiesman > Fix For: 1.3.0 > > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941792#comment-15941792 ] ASF GitHub Bot commented on FLINK-5929: --- Github user sjwiesman closed the pull request at: https://github.com/apache/flink/pull/3479 > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Seth Wiesman > Fix For: 1.3.0 > > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941773#comment-15941773 ] Aljoscha Krettek commented on FLINK-5929: - [~sjwiesman] I created FLINK-6163 and FLINK-6164 as follow-up issues. Just letting you know in case you're interested. :-) > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Seth Wiesman > Fix For: 1.3.0 > > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941771#comment-15941771 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 Thanks for implementing this! I just merged, could you please close this PR? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934894#comment-15934894 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 Thanks! > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934879#comment-15934879 ] ASF GitHub Bot commented on FLINK-5929: --- Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 Pushed the fix, I had to update SideOutputsITCase so the ProcessAllWindowFunctions had a noop clear method. All tests passed locally, take a look > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934726#comment-15934726 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 Ok, please ping me when you pushed the fix. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934691#comment-15934691 ] ASF GitHub Bot commented on FLINK-5929: --- Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 It looks like when I rebased on master I broke one of the scala side outputs test. I'm going to push a fix right now but it won't change any of the code surrounding this pr. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15933393#comment-15933393 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 Don't worry. Is it ready for another review pass now? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15933371#comment-15933371 ] ASF GitHub Bot commented on FLINK-5929: --- Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 sorry for the delay, things got crazy at work. let me know if there are any issues. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922878#comment-15922878 ] ASF GitHub Bot commented on FLINK-5929: --- Github user sjwiesman commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r105763162 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java --- @@ -53,22 +57,47 @@ public InternalAggregateProcessAllWindowFunction( } @Override - public void apply(Byte key, final W window, Iterable input, Collector out) throws Exception { + public void open(Configuration parameters) throws Exception { + super.open(parameters); ProcessAllWindowFunctionwrappedFunction = this.wrappedFunction; - ProcessAllWindowFunction .Context context = wrappedFunction.new Context() { - @Override - public W window() { - return window; - } - }; + this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction); + } + @Override + public void process(Byte aByte, final W window, final InternalWindowContext context, Iterable input, Collector out) throws Exception { final ACC acc = aggFunction.createAccumulator(); for (T val : input) { aggFunction.add(val, acc); } - wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out); + this.ctx.window = window; + this.ctx.internalContext = context; + ProcessAllWindowFunction wrappedFunction = this.wrappedFunction; + wrappedFunction.process(ctx, Collections.singletonList(aggFunction.getResult(acc)), out); + } + + @Override + public void clear(final W window, final InternalWindowContext context) throws Exception { + ProcessAllWindowFunction wrappedFunction = this.wrappedFunction; + final ProcessAllWindowFunction .Context ctx = wrappedFunction.new Context() { --- End diff -- whoops > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907482#comment-15907482 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r105660519 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -153,6 +161,8 @@ protected transient Context context = new Context(null, null); + protected transient WindowContext windowContext = new WindowContext(null); --- End diff -- I think to make it more clear what they do we should rename these two contexts to `triggerContext` and `processContext`. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907484#comment-15907484 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r105661210 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -628,6 +644,123 @@ protected final boolean isCleanupTime(W window, long time) { return time == cleanupTime(window); } + public abstract class KeyedStateStoreWithWindow implements KeyedStateStore { --- End diff -- Maybe comment that we have a base class where we can set the window so that we can once create a `MergingKeyStore` or `WindowPaneKeyStore` (depending on the window assigner) and then not care about the distinction anymore. I remember I suggested this but now struggled to see why there is the base class. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907485#comment-15907485 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r105661470 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java --- @@ -53,22 +57,47 @@ public InternalAggregateProcessAllWindowFunction( } @Override - public void apply(Byte key, final W window, Iterable input, Collector out) throws Exception { + public void open(Configuration parameters) throws Exception { + super.open(parameters); ProcessAllWindowFunctionwrappedFunction = this.wrappedFunction; - ProcessAllWindowFunction .Context context = wrappedFunction.new Context() { - @Override - public W window() { - return window; - } - }; + this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction); + } + @Override + public void process(Byte aByte, final W window, final InternalWindowContext context, Iterable input, Collector out) throws Exception { final ACC acc = aggFunction.createAccumulator(); for (T val : input) { aggFunction.add(val, acc); } - wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out); + this.ctx.window = window; + this.ctx.internalContext = context; + ProcessAllWindowFunction wrappedFunction = this.wrappedFunction; + wrappedFunction.process(ctx, Collections.singletonList(aggFunction.getResult(acc)), out); + } + + @Override + public void clear(final W window, final InternalWindowContext context) throws Exception { + ProcessAllWindowFunction wrappedFunction = this.wrappedFunction; + final ProcessAllWindowFunction .Context ctx = wrappedFunction.new Context() { --- End diff -- leftover anonymous inner `Context`. This should also use `this.ctx` like in `process()`. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907483#comment-15907483 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r105662139 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java --- @@ -92,12 +93,45 @@ public void process(K key, final Context context, Iterable values, Collector< result = foldFunction.fold(result, val); } - windowFunction.process(key, windowFunction.new Context() { + ProcessWindowFunction.Context ctx = windowFunction.new Context() { --- End diff -- This can benefit from a similar refactoring as the internal window functions, i.e. creating an internal context class instead of the anonymous inner classes. This also holds for `FoldApplyProcessAllWindowFunction`, `ReduceApplyProcessAllWindowFunction` and `ReduceApplyProcessWindowFunction`. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901887#comment-15901887 ] ASF GitHub Bot commented on FLINK-5929: --- Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 @aljoscha I made the changes you asked for. Just a heads up, there are a number of files that were superficially changed when migrating from apply -> process but are otherwise untouched. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897915#comment-15897915 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104504635 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java --- @@ -39,5 +40,31 @@ * @param outA collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ + @Deprecated void apply(KEY key, W window, IN input, Collector out) throws Exception; --- End diff -- I think for now it's OK to throw an Exception here. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897829#comment-15897829 ] ASF GitHub Bot commented on FLINK-5929: --- Github user sjwiesman commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104492162 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java --- @@ -39,5 +40,31 @@ * @param outA collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ + @Deprecated void apply(KEY key, W window, IN input, Collector out) throws Exception; --- End diff -- I noticed an issue when removing apply. The method is used inside of AccumulatingKeyedTimePanes which takes in an AbstractStreamOperator as an argument to its evaluateWindow method. When creating the context I can get the global keyed state backend from the operator, but not the partitioned state because those methods are protected. Now the only two uses of this class are its subclasses which have both been deprecated. My question is, do you think I should modify the evaluateWindow method to accept a keyed state store which wraps the operator partitioned state or just throw an exception on context.windowState() because all valid uses of this method have been deprecated? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897683#comment-15897683 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104470521 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java --- @@ -39,5 +40,31 @@ * @param outA collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ + @Deprecated void apply(KEY key, W window, IN input, Collector out) throws Exception; --- End diff -- Ah, I meant to actually write that earlier. Yes: please remove. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897597#comment-15897597 ] ASF GitHub Bot commented on FLINK-5929: --- Github user sjwiesman commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104458722 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java --- @@ -39,5 +40,31 @@ * @param outA collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ + @Deprecated void apply(KEY key, W window, IN input, Collector out) throws Exception; --- End diff -- @aljoscha I meant to ask, should I leave this method or remove it? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897448#comment-15897448 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104432787 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -629,6 +645,135 @@ protected final boolean isCleanupTime(W window, long time) { } /** +* For now keyed state is not allowed in ProcessWindowFunctions +*/ + public class MergingKeyStore implements KeyedStateStore { + + protected W window; + + @Override + public ValueState getState(ValueStateDescriptor stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public ListState getListState(ListStateDescriptor stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public ReducingState getReducingState(ReducingStateDescriptor stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + publicFoldingState getFoldingState(FoldingStateDescriptor stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public MapState getMapState(MapStateDescriptor stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + } + + public class WindowPaneKeyStore implements KeyedStateStore { + + protected W window; + + @Override + public ValueState getState(ValueStateDescriptor stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public ListState getListState(ListStateDescriptor stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public ReducingState getReducingState(ReducingStateDescriptor stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public FoldingState getFoldingState(FoldingStateDescriptor stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public MapState getMapState(MapStateDescriptor stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + } + + /** +* {@code WindowContext} is a utility for handling {@code ProcessWindowFunction} invocations. It can be reused +* by setting the {@code key} and {@code window} fields. No internal state must be kept in +* the {@code WindowContext} +*/ + public class WindowContext implements InternalWindowFunction.InternalWindowContext { + protected W window; + + protected
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897446#comment-15897446 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104432699 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -629,6 +645,135 @@ protected final boolean isCleanupTime(W window, long time) { } /** +* For now keyed state is not allowed in ProcessWindowFunctions +*/ + public class MergingKeyStore implements KeyedStateStore { + + protected W window; + + @Override + public ValueState getState(ValueStateDescriptor stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public ListState getListState(ListStateDescriptor stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public ReducingState getReducingState(ReducingStateDescriptor stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + publicFoldingState getFoldingState(FoldingStateDescriptor stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public MapState getMapState(MapStateDescriptor stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + } + + public class WindowPaneKeyStore implements KeyedStateStore { + + protected W window; + + @Override + public ValueState getState(ValueStateDescriptor stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public ListState getListState(ListStateDescriptor stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public ReducingState getReducingState(ReducingStateDescriptor stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public FoldingState getFoldingState(FoldingStateDescriptor stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public MapState getMapState(MapStateDescriptor stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + } + + /** +* {@code WindowContext} is a utility for handling {@code ProcessWindowFunction} invocations. It can be reused +* by setting the {@code key} and {@code window} fields. No internal state must be kept in +* the {@code WindowContext} +*/ + public class WindowContext implements InternalWindowFunction.InternalWindowContext { + protected W window; + + protected
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897444#comment-15897444 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 One quick initial remark: instead of each time having an anonymous inner class for the `Context` you can create a reusable class for that like this: ``` class InternalProcessWindowContextextends ProcessWindowFunction .Context { W window; InternalWindowFunction.InternalWindowContext internalContext; InternalProcessWindowContext(ProcessWindowFunction function) { function.super(); } @Override public W window() { return window; } @Override public KeyedStateStore windowState() { return internalContext.windowState(); } @Override public KeyedStateStore globalState() { return internalContext.globalState(); } } ``` The `function.super()` call in there makes it work even though `Context` is itself defined as an inner abstract class of `ProcessWindowFunction`. It's a bit of black magic and not really too well known, I think. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897409#comment-15897409 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 Thanks @sjwiesman! I'll have a look. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897369#comment-15897369 ] ASF GitHub Bot commented on FLINK-5929: --- GitHub user sjwiesman opened a pull request: https://github.com/apache/flink/pull/3479 [FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction Right now, the state that a WindowFunction or ProcessWindowFunction can access is scoped to the key of the window but not the window itself. That is, state is global across all windows for a given key. For some use cases it is beneficial to keep state scoped to a window. For example, if you expect to have several Trigger firings (due to early and late firings) a user can keep state per window to keep some information between those firings. @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/sjwiesman/flink FLINK-5929 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3479.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3479 commit 623edd1fb107e8dd0aae755a7b252df1f91713bd Author: Seth WiesmanDate: 2017-03-06T04:07:18Z [FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction Right now, the state that a WindowFunction or ProcessWindowFunction can access is scoped to the key of the window but not the window itself. That is, state is global across all windows for a given key. For some use cases it is beneficial to keep state scoped to a window. For example, if you expect to have several Trigger firings (due to early and late firings) a user can keep state per window to keep some information between those firings. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893830#comment-15893830 ] Aljoscha Krettek commented on FLINK-5929: - Sorry, overlooked the other comment. Yes, I think throwing an Exception is good for now. For the tests, I think we're good if we have solid tests in {{WindowOperatorContractTest}}. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15892413#comment-15892413 ] Aljoscha Krettek commented on FLINK-5929: - [~sjwiesman] The best place to put new tests for the {{WindowOperator}} is {{WindowOperatorContractTest}}, this has low-level tests for specific behaviours of the {{WindowOperator}}. Ideally, we would not have {{WindowOperator}} test anymore because it is a more crude way of testing. It's simply throwing things at the operator and checking whether the result is correct whereas the new tests in the contract test are more specific. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15892398#comment-15892398 ] Seth Wiesman commented on FLINK-5929: - [~aljoscha] I have a version of this that I feel pretty good, I'm working on my test coverage but I am not sure where the appropriate place to put the tests for this would be. I have some in the WindowOperatorTest and am working on some IntergrationTests. What else do you think makes sense for this type of feature? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15892362#comment-15892362 ] Seth Wiesman commented on FLINK-5929: - [~aljoscha] That seems like a reasonable first step. For now would it be acceptable to through an exception if they try to access state in a merging window? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15892018#comment-15892018 ] Aljoscha Krettek commented on FLINK-5929: - [~sjwiesman] One more thing: we can only allow access to per-window state for non-merging windows, for now. For merging windows, the window function would need a method for merging state, similar to {{Trigger.merge()}} and I don't want to give that to users just yet because it might seem a bit overwhelming. What do you think? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890334#comment-15890334 ] Aljoscha Krettek commented on FLINK-5929: - [~sjwiesman] AFAIK, it's wrapped in an {{InternalWindowFunction}}. It has this name for historical reasons but we can easily change the name and the method signature because it's not public API. Does this help? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890328#comment-15890328 ] Aljoscha Krettek commented on FLINK-5929: - [~rehevkor5] Ahh, I didn't see that you're accessing the state that the trigger is keeping (and cleaning up). > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889297#comment-15889297 ] Shannon Carey commented on FLINK-5929: -- [~aljoscha] as far as I am aware, the state does get cleared out by our Trigger. In Trigger#clear() we have: ctx.getPartitionedState(fireTimestampStateDescriptor).clear(); We could have done it in the Window Function instead, if we wanted to, given our hack. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888357#comment-15888357 ] Seth Wiesman commented on FLINK-5929: - [~aljoscha] Curioius, why is ProcessWindowFunction wrapped in a WindowFunction before being passed to the WindowOperator as opposed to the other way around if ProcessWindowFunction the _next gen_ window function? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887815#comment-15887815 ] Aljoscha Krettek commented on FLINK-5929: - [~rehevkor5] Yes, as Seth mentioned the global refers to "not-window-scoped" as opposed to "global for the whole job". [~sjwiesman] I'd be very happy if you contributed your code, yes. I'm also in favour of the {{clear()}}/{{cleanup()}} method because Flink, so far, has erred on the side of being explicit. And, as you said, this is apparent in the way {{Triggers}} work. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887818#comment-15887818 ] Aljoscha Krettek commented on FLINK-5929: - [~rehevkor5] Ah forgot to mention, in your workaround you could never clean up your state, right? Because of the lack of a {{clear()}} method for the {{WindowFunction}}. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887811#comment-15887811 ] ASF GitHub Bot commented on FLINK-5929: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3427 Wrong Jira Issue ID. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887731#comment-15887731 ] ASF GitHub Bot commented on FLINK-5929: --- GitHub user uce opened a pull request: https://github.com/apache/flink/pull/3427 [FLINK-5929] [tests] Fix SavepointITCase instability When shutting down the testing cluster it could happen that checkpoint files lingered around (for checkpoints independent of the savepoint). This commit deactives checkpointing for the test and uses count down latches to track progress, which also reduces the test time. I've triggered multiple Travis builds. I will merge this if they build without the `SavepointITCase` failing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 5923-savepoint_it_case Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3427.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3427 commit e0c46f34f55bfee743236c4042be4b2501436811 Author: Ufuk CelebiDate: 2017-02-28T10:13:28Z [FLINK-5929] [tests] Fix SavepointITCase instability When shutting down the testing cluster it can happen that checkpoint files lingered around (checkpoints independent of the savepoint). This commit deactives checkpointing for the test and uses count down latches to track progress, which also reduces the test time. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886446#comment-15886446 ] Seth Wiesman commented on FLINK-5929: - [~aljoscha] Yes this does look like what we were discussing. Regarding cleanup vs using gc horizon, as a user I think I would expect there to be a cleanup method to be consistent with the trigger context. I already have a rudimentary version of this implemented for my own use so I would be happy to take this ticket, clean up my code, and submit a pr. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886450#comment-15886450 ] Seth Wiesman commented on FLINK-5929: - [~rehevkor5] Yes, state for individual window panes. > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886419#comment-15886419 ] Shannon Carey commented on FLINK-5929: -- If I understand correctly, I agree this would be useful. Currently we are working around this limitation in order to achieve communication between the Trigger (per-pane state) and the WindowFunction (per-operator state) by a hack within the WindowFunction that looks like this (we're not on 1.2 yet so we haven't looked at new ways to do this yet): {code} def apply(key: String, window: TimeWindow, input, out): Unit = { val fireTimestampState: ValueState[java.lang.Long] = getRuntimeContext.getState[java.lang.Long](fireTimestampStateDescriptor) if (fireTimestampState.isInstanceOf[MemValueState[String, TimeWindow, java.lang.Long]]) { fireTimestampState.asInstanceOf[MemValueState[String, TimeWindow, java.lang.Long]].setCurrentNamespace(window) } else if (fireTimestampState.isInstanceOf[RocksDBValueState[String, TimeWindow, java.lang.Long]]) { fireTimestampState.asInstanceOf[RocksDBValueState[String, TimeWindow, java.lang.Long]].setCurrentNamespace(window) } else if (fireTimestampState.isInstanceOf[FsValueState[String, TimeWindow, java.lang.Long]]) { fireTimestampState.asInstanceOf[FsValueState[String, TimeWindow, java.lang.Long]].setCurrentNamespace(window) } fireTimestampState.value() ... {code} > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886378#comment-15886378 ] Shannon Carey commented on FLINK-5929: -- Just to clarify, you're referring to the state from AbstractRichFunction#getRuntimeContext.getState(), right? That's the state keyed by the combination of event key and window operator. What you're suggesting is adding state for the individual panes within the window, right? > Allow Access to Per-Window State in ProcessWindowFunction > - > > Key: FLINK-5929 > URL: https://issues.apache.org/jira/browse/FLINK-5929 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek > > Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} > can access is scoped to the key of the window but not the window itself. That > is, state is global across all windows for a given key. > For some use cases it is beneficial to keep state scoped to a window. For > example, if you expect to have several {{Trigger}} firings (due to early and > late firings) a user can keep state per window to keep some information > between those firings. > The per-window state has to be cleaned up in some way. For this I see two > options: > - Keep track of all state that a user uses and clean up when we reach the > window GC horizon. > - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called > when we reach the window GC horizon that users can/should use to clean up > their state. > On the API side, we can add a method {{windowState()}} on > {{ProcessWindowFunction.Context}} that retrieves the per-window state and > {{globalState()}} that would allow access to the (already available) global > state. The {{Context}} would then look like this: > {code} > /** > * The context holding window metadata > */ > public abstract class Context { > /** > * @return The window that is being evaluated. > */ > public abstract W window(); > /** > * State accessor for per-key and per-window state. > */ > KeyedStateStore windowState(); > /** > * State accessor for per-key global state. > */ > KeyedStateStore globalState(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)