[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942231#comment-15942231
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3479
  
Hehe, thanks!  


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Seth Wiesman
> Fix For: 1.3.0
>
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941791#comment-15941791
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user sjwiesman commented on the issue:

https://github.com/apache/flink/pull/3479
  
Done! Thank you for for helping me get this feature merged in. This has to 
be one of the most painless commits I've ever made to an open source project of 
this size. 


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Seth Wiesman
> Fix For: 1.3.0
>
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941792#comment-15941792
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user sjwiesman closed the pull request at:

https://github.com/apache/flink/pull/3479


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Seth Wiesman
> Fix For: 1.3.0
>
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-25 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941773#comment-15941773
 ] 

Aljoscha Krettek commented on FLINK-5929:
-

[~sjwiesman] I created FLINK-6163 and FLINK-6164 as follow-up issues. Just 
letting you know in case you're interested. :-)

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Seth Wiesman
> Fix For: 1.3.0
>
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941771#comment-15941771
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3479
  
Thanks for implementing this!  

I just merged, could you please close this PR?


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934894#comment-15934894
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3479
  
Thanks!


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934879#comment-15934879
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user sjwiesman commented on the issue:

https://github.com/apache/flink/pull/3479
  
Pushed the fix, I had to update SideOutputsITCase so the 
ProcessAllWindowFunctions had a noop clear method. All tests passed locally, 
take a look


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934726#comment-15934726
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3479
  
Ok, please ping me when you pushed the fix.  


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934691#comment-15934691
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user sjwiesman commented on the issue:

https://github.com/apache/flink/pull/3479
  
It looks like when I rebased on master I broke one of the scala side 
outputs test. I'm going to push a fix right now but it won't change any of the 
code surrounding this pr. 


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15933393#comment-15933393
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3479
  
Don't worry.  Is it ready for another review pass now?


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15933371#comment-15933371
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user sjwiesman commented on the issue:

https://github.com/apache/flink/pull/3479
  
sorry for the delay, things got crazy at work. let me know if there are any 
issues. 


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922878#comment-15922878
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user sjwiesman commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r105763162
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
 ---
@@ -53,22 +57,47 @@ public InternalAggregateProcessAllWindowFunction(
}
 
@Override
-   public void apply(Byte key, final W window, Iterable input, 
Collector out) throws Exception {
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
ProcessAllWindowFunction wrappedFunction = 
this.wrappedFunction;
-   ProcessAllWindowFunction.Context context = 
wrappedFunction.new Context() {
-   @Override
-   public W window() {
-   return window;
-   }
-   };
+   this.ctx = new 
InternalProcessAllWindowContext<>(wrappedFunction);
+   }
 
+   @Override
+   public void process(Byte aByte, final W window, final 
InternalWindowContext context, Iterable input, Collector out) throws 
Exception {
final ACC acc = aggFunction.createAccumulator();
 
for (T val : input) {
aggFunction.add(val, acc);
}
 
-   wrappedFunction.process(context, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+   this.ctx.window = window;
+   this.ctx.internalContext = context;
+   ProcessAllWindowFunction wrappedFunction = 
this.wrappedFunction;
+   wrappedFunction.process(ctx, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+   }
+
+   @Override
+   public void clear(final W window, final InternalWindowContext context) 
throws Exception {
+   ProcessAllWindowFunction wrappedFunction = 
this.wrappedFunction;
+   final ProcessAllWindowFunction.Context ctx = 
wrappedFunction.new Context() {
--- End diff --

whoops  


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907482#comment-15907482
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r105660519
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -153,6 +161,8 @@
 
protected transient Context context = new Context(null, null);
 
+   protected transient WindowContext windowContext = new 
WindowContext(null);
--- End diff --

I think to make it more clear what they do we should rename these two 
contexts to `triggerContext` and `processContext`.


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907484#comment-15907484
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r105661210
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -628,6 +644,123 @@ protected final boolean isCleanupTime(W window, long 
time) {
return time == cleanupTime(window);
}
 
+   public abstract class KeyedStateStoreWithWindow implements 
KeyedStateStore {
--- End diff --

Maybe comment that we have a base class where we can set the window so that 
we can once create a `MergingKeyStore` or `WindowPaneKeyStore` (depending on 
the window assigner) and then not care about the distinction anymore. 

I remember I suggested this but now struggled to see why there is the base 
class.  


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907485#comment-15907485
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r105661470
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
 ---
@@ -53,22 +57,47 @@ public InternalAggregateProcessAllWindowFunction(
}
 
@Override
-   public void apply(Byte key, final W window, Iterable input, 
Collector out) throws Exception {
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
ProcessAllWindowFunction wrappedFunction = 
this.wrappedFunction;
-   ProcessAllWindowFunction.Context context = 
wrappedFunction.new Context() {
-   @Override
-   public W window() {
-   return window;
-   }
-   };
+   this.ctx = new 
InternalProcessAllWindowContext<>(wrappedFunction);
+   }
 
+   @Override
+   public void process(Byte aByte, final W window, final 
InternalWindowContext context, Iterable input, Collector out) throws 
Exception {
final ACC acc = aggFunction.createAccumulator();
 
for (T val : input) {
aggFunction.add(val, acc);
}
 
-   wrappedFunction.process(context, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+   this.ctx.window = window;
+   this.ctx.internalContext = context;
+   ProcessAllWindowFunction wrappedFunction = 
this.wrappedFunction;
+   wrappedFunction.process(ctx, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+   }
+
+   @Override
+   public void clear(final W window, final InternalWindowContext context) 
throws Exception {
+   ProcessAllWindowFunction wrappedFunction = 
this.wrappedFunction;
+   final ProcessAllWindowFunction.Context ctx = 
wrappedFunction.new Context() {
--- End diff --

leftover anonymous inner `Context`. This should also use `this.ctx` like in 
`process()`.


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907483#comment-15907483
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r105662139
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
 ---
@@ -92,12 +93,45 @@ public void process(K key, final Context context, 
Iterable values, Collector<
result = foldFunction.fold(result, val);
}
 
-   windowFunction.process(key, windowFunction.new Context() {
+   ProcessWindowFunction.Context ctx = 
windowFunction.new Context() {
--- End diff --

This can benefit from a similar refactoring as the internal window 
functions, i.e. creating an internal context class instead of the anonymous 
inner classes.

This also holds for `FoldApplyProcessAllWindowFunction`, 
`ReduceApplyProcessAllWindowFunction` and `ReduceApplyProcessWindowFunction`.


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901887#comment-15901887
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user sjwiesman commented on the issue:

https://github.com/apache/flink/pull/3479
  
@aljoscha I made the changes you asked for. Just a heads up, there are a 
number of files that were superficially changed when migrating from apply -> 
process but are otherwise untouched. 


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897915#comment-15897915
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r104504635
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
 ---
@@ -39,5 +40,31 @@
 * @param outA collector for emitting elements.
 * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
 */
+   @Deprecated
void apply(KEY key, W window, IN input, Collector out) throws 
Exception;
--- End diff --

I think for now it's OK to throw an Exception here.


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897829#comment-15897829
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user sjwiesman commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r104492162
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
 ---
@@ -39,5 +40,31 @@
 * @param outA collector for emitting elements.
 * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
 */
+   @Deprecated
void apply(KEY key, W window, IN input, Collector out) throws 
Exception;
--- End diff --

I noticed an issue when removing apply. The method is used inside of 
AccumulatingKeyedTimePanes which takes in an AbstractStreamOperator as an 
argument to its evaluateWindow method. When creating the context I can get the 
global keyed state backend from the operator, but not the partitioned state 
because those methods are protected. Now the only two uses of this class are 
its subclasses which have both been deprecated. My question is, do you think I 
should modify the evaluateWindow method to accept a keyed state store which 
wraps the operator partitioned state or just throw an exception on 
context.windowState() because all valid uses of this method have been 
deprecated? 


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897683#comment-15897683
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r104470521
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
 ---
@@ -39,5 +40,31 @@
 * @param outA collector for emitting elements.
 * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
 */
+   @Deprecated
void apply(KEY key, W window, IN input, Collector out) throws 
Exception;
--- End diff --

Ah, I meant to actually write that earlier. Yes: please remove.  


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897597#comment-15897597
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user sjwiesman commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r104458722
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
 ---
@@ -39,5 +40,31 @@
 * @param outA collector for emitting elements.
 * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
 */
+   @Deprecated
void apply(KEY key, W window, IN input, Collector out) throws 
Exception;
--- End diff --

@aljoscha I meant to ask, should I leave this method or remove it? 


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897448#comment-15897448
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r104432787
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -629,6 +645,135 @@ protected final boolean isCleanupTime(W window, long 
time) {
}
 
/**
+* For now keyed state is not allowed in ProcessWindowFunctions
+*/
+   public class MergingKeyStore implements KeyedStateStore {
+
+   protected W window;
+
+   @Override
+   public  ValueState getState(ValueStateDescriptor 
stateProperties) {
+   throw new RuntimeException("keyedState is not allowed 
in merging windows");
+   }
+
+   @Override
+   public  ListState getListState(ListStateDescriptor 
stateProperties) {
+   throw new RuntimeException("keyedState is not allowed 
in merging windows");
+   }
+
+   @Override
+   public  ReducingState 
getReducingState(ReducingStateDescriptor stateProperties) {
+   throw new RuntimeException("keyedState is not allowed 
in merging windows");
+   }
+
+   @Override
+   public  FoldingState 
getFoldingState(FoldingStateDescriptor stateProperties) {
+   throw new RuntimeException("keyedState is not allowed 
in merging windows");
+   }
+
+   @Override
+   public  MapState 
getMapState(MapStateDescriptor stateProperties) {
+   throw new RuntimeException("keyedState is not allowed 
in merging windows");
+   }
+   }
+
+   public class WindowPaneKeyStore implements KeyedStateStore {
+
+   protected W window;
+
+   @Override
+   public  ValueState getState(ValueStateDescriptor 
stateProperties) {
+   try {
+   return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Could not retrieve 
state", e);
+   }
+   }
+
+   @Override
+   public  ListState getListState(ListStateDescriptor 
stateProperties) {
+   try {
+   return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Could not retrieve 
state", e);
+   }
+   }
+
+   @Override
+   public  ReducingState 
getReducingState(ReducingStateDescriptor stateProperties) {
+   try {
+   return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Could not retrieve 
state", e);
+   }
+   }
+
+   @Override
+   public  FoldingState 
getFoldingState(FoldingStateDescriptor stateProperties) {
+   try {
+   return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Could not retrieve 
state", e);
+   }
+   }
+
+   @Override
+   public  MapState 
getMapState(MapStateDescriptor stateProperties) {
+   try {
+   return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Could not retrieve 
state", e);
+   }
+   }
+   }
+
+   /**
+* {@code WindowContext} is a utility for handling {@code 
ProcessWindowFunction} invocations. It can be reused
+* by setting the {@code key} and {@code window} fields. No internal 
state must be kept in
+* the {@code WindowContext}
+*/
+   public class WindowContext implements 
InternalWindowFunction.InternalWindowContext {
+   protected W window;
+
+   protected 

[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897446#comment-15897446
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3479#discussion_r104432699
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -629,6 +645,135 @@ protected final boolean isCleanupTime(W window, long 
time) {
}
 
/**
+* For now keyed state is not allowed in ProcessWindowFunctions
+*/
+   public class MergingKeyStore implements KeyedStateStore {
+
+   protected W window;
+
+   @Override
+   public  ValueState getState(ValueStateDescriptor 
stateProperties) {
+   throw new RuntimeException("keyedState is not allowed 
in merging windows");
+   }
+
+   @Override
+   public  ListState getListState(ListStateDescriptor 
stateProperties) {
+   throw new RuntimeException("keyedState is not allowed 
in merging windows");
+   }
+
+   @Override
+   public  ReducingState 
getReducingState(ReducingStateDescriptor stateProperties) {
+   throw new RuntimeException("keyedState is not allowed 
in merging windows");
+   }
+
+   @Override
+   public  FoldingState 
getFoldingState(FoldingStateDescriptor stateProperties) {
+   throw new RuntimeException("keyedState is not allowed 
in merging windows");
+   }
+
+   @Override
+   public  MapState 
getMapState(MapStateDescriptor stateProperties) {
+   throw new RuntimeException("keyedState is not allowed 
in merging windows");
+   }
+   }
+
+   public class WindowPaneKeyStore implements KeyedStateStore {
+
+   protected W window;
+
+   @Override
+   public  ValueState getState(ValueStateDescriptor 
stateProperties) {
+   try {
+   return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Could not retrieve 
state", e);
+   }
+   }
+
+   @Override
+   public  ListState getListState(ListStateDescriptor 
stateProperties) {
+   try {
+   return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Could not retrieve 
state", e);
+   }
+   }
+
+   @Override
+   public  ReducingState 
getReducingState(ReducingStateDescriptor stateProperties) {
+   try {
+   return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Could not retrieve 
state", e);
+   }
+   }
+
+   @Override
+   public  FoldingState 
getFoldingState(FoldingStateDescriptor stateProperties) {
+   try {
+   return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Could not retrieve 
state", e);
+   }
+   }
+
+   @Override
+   public  MapState 
getMapState(MapStateDescriptor stateProperties) {
+   try {
+   return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateProperties);
+   } catch (Exception e) {
+   throw new RuntimeException("Could not retrieve 
state", e);
+   }
+   }
+   }
+
+   /**
+* {@code WindowContext} is a utility for handling {@code 
ProcessWindowFunction} invocations. It can be reused
+* by setting the {@code key} and {@code window} fields. No internal 
state must be kept in
+* the {@code WindowContext}
+*/
+   public class WindowContext implements 
InternalWindowFunction.InternalWindowContext {
+   protected W window;
+
+   protected 

[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897444#comment-15897444
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3479
  
One quick initial remark: instead of each time having an anonymous inner 
class for the `Context` you can create a reusable class for that like this:
```
class InternalProcessWindowContext
extends ProcessWindowFunction.Context {

W window;
InternalWindowFunction.InternalWindowContext internalContext;

InternalProcessWindowContext(ProcessWindowFunction 
function) {
function.super();
}

@Override
public W window() {
return window;
}

@Override
public KeyedStateStore windowState() {
return internalContext.windowState();
}

@Override
public KeyedStateStore globalState() {
return internalContext.globalState();
}
}
```

The `function.super()` call in there makes it work even though `Context` is 
itself defined as an inner abstract class of `ProcessWindowFunction`. It's a 
bit of black magic and not really too well known, I think. 


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897409#comment-15897409
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3479
  
Thanks @sjwiesman! I'll have a look.


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897369#comment-15897369
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

GitHub user sjwiesman opened a pull request:

https://github.com/apache/flink/pull/3479

[FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction

Right now, the state that a WindowFunction or ProcessWindowFunction can
access is scoped to the key of the window but not the window itself.
That is, state is global across all windows for a given key.
For some use cases it is beneficial to keep state scoped to a window.
For example, if you expect to have several Trigger firings (due to early
and late firings) a user can keep state per window to keep some
information between those firings.


@aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sjwiesman/flink FLINK-5929

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3479.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3479


commit 623edd1fb107e8dd0aae755a7b252df1f91713bd
Author: Seth Wiesman 
Date:   2017-03-06T04:07:18Z

[FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction

Right now, the state that a WindowFunction or ProcessWindowFunction can
access is scoped to the key of the window but not the window itself.
That is, state is global across all windows for a given key.
For some use cases it is beneficial to keep state scoped to a window.
For example, if you expect to have several Trigger firings (due to early
and late firings) a user can keep state per window to keep some
information between those firings.




> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893830#comment-15893830
 ] 

Aljoscha Krettek commented on FLINK-5929:
-

Sorry, overlooked the other comment. Yes, I think throwing an Exception is good 
for now. For the tests, I think we're good if we have solid tests in 
{{WindowOperatorContractTest}}.

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15892413#comment-15892413
 ] 

Aljoscha Krettek commented on FLINK-5929:
-

[~sjwiesman] The best place to put new tests for the {{WindowOperator}} is 
{{WindowOperatorContractTest}}, this has low-level tests for specific 
behaviours of the {{WindowOperator}}. Ideally, we would not have 
{{WindowOperator}} test anymore because it is a more crude way of testing. It's 
simply throwing things at the operator and checking whether the result is 
correct whereas the new tests in the contract test are more specific.

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-02 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15892398#comment-15892398
 ] 

Seth Wiesman commented on FLINK-5929:
-

[~aljoscha] I have a version of this that I feel pretty good, I'm working on my 
test coverage but I am not sure where the appropriate place to put the tests 
for this would be. I have some in the WindowOperatorTest and am working on some 
IntergrationTests. What else do you think makes sense for this type of feature? 

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-02 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15892362#comment-15892362
 ] 

Seth Wiesman commented on FLINK-5929:
-

[~aljoscha] That seems like a reasonable first step. For now would it be 
acceptable to through an exception if they try to access state in a merging 
window? 

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15892018#comment-15892018
 ] 

Aljoscha Krettek commented on FLINK-5929:
-

[~sjwiesman] One more thing: we can only allow access to per-window state for 
non-merging windows, for now. For merging windows, the window function would 
need a method for merging state, similar to {{Trigger.merge()}} and I don't 
want to give that to users just yet because it might seem a bit overwhelming. 
What do you think?

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-01 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890334#comment-15890334
 ] 

Aljoscha Krettek commented on FLINK-5929:
-

[~sjwiesman] AFAIK, it's wrapped in an {{InternalWindowFunction}}. It has this 
name for historical reasons but we can easily change the name and the method 
signature because it's not public API.

Does this help?

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-03-01 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890328#comment-15890328
 ] 

Aljoscha Krettek commented on FLINK-5929:
-

[~rehevkor5] Ahh, I didn't see that you're accessing the state that the trigger 
is keeping (and cleaning up).

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-28 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889297#comment-15889297
 ] 

Shannon Carey commented on FLINK-5929:
--

[~aljoscha] as far as I am aware, the state does get cleared out by our 
Trigger. In Trigger#clear() we have: 
ctx.getPartitionedState(fireTimestampStateDescriptor).clear(); We could have 
done it in the Window Function instead, if we wanted to, given our hack.

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-28 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888357#comment-15888357
 ] 

Seth Wiesman commented on FLINK-5929:
-

[~aljoscha] Curioius, why is ProcessWindowFunction wrapped in a WindowFunction 
before being passed to the WindowOperator as opposed to the other way around if 
ProcessWindowFunction the _next gen_ window function? 

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-28 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887815#comment-15887815
 ] 

Aljoscha Krettek commented on FLINK-5929:
-

[~rehevkor5] Yes, as Seth mentioned the global refers to "not-window-scoped" as 
opposed to "global for the whole job".

[~sjwiesman] I'd be very happy if you contributed your code, yes.

I'm also in favour of the {{clear()}}/{{cleanup()}} method because Flink, so 
far, has erred on the side of being explicit. And, as you said, this is 
apparent in the way {{Triggers}} work.

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-28 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887818#comment-15887818
 ] 

Aljoscha Krettek commented on FLINK-5929:
-

[~rehevkor5] Ah forgot to mention, in your workaround you could never clean up 
your state, right? Because of the lack of a {{clear()}} method for the 
{{WindowFunction}}.

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887811#comment-15887811
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3427
  
Wrong Jira Issue ID.  


> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887731#comment-15887731
 ] 

ASF GitHub Bot commented on FLINK-5929:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/3427

[FLINK-5929] [tests] Fix SavepointITCase instability

When shutting down the testing cluster it could happen that checkpoint 
files lingered around (for checkpoints independent of the savepoint).

This commit deactives checkpointing for the test and uses count down 
latches to track progress, which also reduces the test time.

I've triggered multiple Travis builds. I will merge this if they build 
without the `SavepointITCase` failing.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 5923-savepoint_it_case

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3427.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3427


commit e0c46f34f55bfee743236c4042be4b2501436811
Author: Ufuk Celebi 
Date:   2017-02-28T10:13:28Z

[FLINK-5929] [tests] Fix SavepointITCase instability

When shutting down the testing cluster it can happen that checkpoint
files lingered around (checkpoints independent of the savepoint).

This commit deactives checkpointing for the test and uses count down
latches to track progress, which also reduces the test time.




> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886446#comment-15886446
 ] 

Seth Wiesman commented on FLINK-5929:
-

[~aljoscha] Yes this does look like what we were discussing. Regarding cleanup 
vs using gc horizon, as a user I think I would expect there to be a cleanup 
method to be consistent with the trigger context. I already have a rudimentary 
version of this implemented for my own use so I would be happy to take this 
ticket, clean up my code, and submit a pr. 

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886450#comment-15886450
 ] 

Seth Wiesman commented on FLINK-5929:
-

[~rehevkor5] Yes, state for individual window panes. 

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886419#comment-15886419
 ] 

Shannon Carey commented on FLINK-5929:
--

If I understand correctly, I agree this would be useful. Currently we are 
working around this limitation in order to achieve communication between the 
Trigger (per-pane state) and the WindowFunction (per-operator state) by a hack 
within the WindowFunction that looks like this (we're not on 1.2 yet so we 
haven't looked at new ways to do this yet):

{code}
  def apply(key: String, window: TimeWindow, input, out): Unit = {
val fireTimestampState: ValueState[java.lang.Long] =
  getRuntimeContext.getState[java.lang.Long](fireTimestampStateDescriptor)

if (fireTimestampState.isInstanceOf[MemValueState[String, TimeWindow, 
java.lang.Long]]) {
  fireTimestampState.asInstanceOf[MemValueState[String, TimeWindow, 
java.lang.Long]].setCurrentNamespace(window)
} else if (fireTimestampState.isInstanceOf[RocksDBValueState[String, 
TimeWindow, java.lang.Long]]) {
  fireTimestampState.asInstanceOf[RocksDBValueState[String, TimeWindow, 
java.lang.Long]].setCurrentNamespace(window)
} else if (fireTimestampState.isInstanceOf[FsValueState[String, TimeWindow, 
java.lang.Long]]) {
  fireTimestampState.asInstanceOf[FsValueState[String, TimeWindow, 
java.lang.Long]].setCurrentNamespace(window)
}
fireTimestampState.value()
...
{code}

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886378#comment-15886378
 ] 

Shannon Carey commented on FLINK-5929:
--

Just to clarify, you're referring to the state from 
AbstractRichFunction#getRuntimeContext.getState(), right? That's the state 
keyed by the combination of event key and window operator. What you're 
suggesting is adding state for the individual panes within the window, right?

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)