[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319532#comment-16319532 ] ASF GitHub Bot commented on FLINK-5982: --- Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3633 Thanks a lot for doing these work. Nice to see this PR be merged. =) > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318879#comment-16318879 ] ASF GitHub Bot commented on FLINK-5982: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3633 > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318396#comment-16318396 ] ASF GitHub Bot commented on FLINK-5982: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3633 I finally found the time to look at this. Have rebased it and applied some simplifications: - convenience constructors for stateless tasks - test utils that provide mock environments directly - Task Test Harnesses create the tasks lazily via a factory. All in all that makes for a good patch in my opinion. Will merge this... > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196607#comment-16196607 ] ASF GitHub Bot commented on FLINK-5982: --- Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3633 Hi @StephanEwen Do you have time recently? Or can we find someone to review it if you are too busy to do this? Thank you. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16087333#comment-16087333 ] ASF GitHub Bot commented on FLINK-5982: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3633 Sorry about the delay. I still have this on my list... > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16077956#comment-16077956 ] ASF GitHub Bot commented on FLINK-5982: --- Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3633 Hi @StephanEwen Do you have time to review this PR recently? > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16013259#comment-16013259 ] ASF GitHub Bot commented on FLINK-5982: --- Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3633 I see. Thank you. =) > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16012419#comment-16012419 ] ASF GitHub Bot commented on FLINK-5982: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3633 Sorry, I did not manage to merge this, yet. Will try to take it on after the 1.3 release. Thanks for the offer to rebase that. I can ping you when I would be getting close to merging this, so that you could rebase it once then (rather than now and potentially again in a few weeks). > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011984#comment-16011984 ] ASF GitHub Bot commented on FLINK-5982: --- Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3633 Hi @StephanEwen Have you been reviewing this PR? If not, I would like to rebase it and fix those conflicts. What do you think? > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947032#comment-15947032 ] ASF GitHub Bot commented on FLINK-5982: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3633 Thanks, I will try and look at this over the next week. It is quite a big change, so I might need a bit of time to review it properly... > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946503#comment-15946503 ] ASF GitHub Bot commented on FLINK-5982: --- Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3633 @StephanEwen Thanks for the suggestion. I have updated the patch. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945578#comment-15945578 ] ASF GitHub Bot commented on FLINK-5982: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3633 Thanks for this refactoring. I had only a quick look so far, but one thought was that this patch could probably be simplified by making `triggerCheckpoint()` (and restore / notifyComplete) not abstract in the `AbstractInvokable`. Instead let it throw the `UnsupportedOperationException`. That way, we avoid refactoring all the test classes to implement those methods (which would be nice). > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945161#comment-15945161 ] ASF GitHub Bot commented on FLINK-5982: --- GitHub user tony810430 opened a pull request: https://github.com/apache/flink/pull/3633 [FLINK-5982] [runtime] Refactor AbstractInvokable and StatefulTask This PR wants to merge `AbstractInvokable` and `StatefulTask` to eliminate "lazy initialization" in `Task`. The following are what I did in this PR: 1. Merge `AbstractInvokable` and `StatefulTask` and use constructor to initialize `Environment` and `TaskStateHandles` in `AbstractInvokable`. 2. Update all subclasses and corresponding tests. For now, batch tasks are still not stateful, so I added `IllegalStateException()` in constructor and `UnsupportedOperationException()` in the methods for supporting stateful task. 3. Update the behavior of `AbstractInvokable` in `Task`, including `loadAndInstantiateInvokable()`, `triggerCheckpointBarrier()` and `notifyCheckpointComplete()`. 4. Add tests for modification in `Task` and tests the constructors' behavior for all concrete subclasses which inherit `AbstractInvokable`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tony810430/flink FLINK-5982 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3633.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3633 commit 31748905eaa3b4897f0b02473526b7fa05c2304e Author: Tony WeiDate: 2017-03-15T03:13:41Z [FLINK-5982] Refactor AbstractInvokable and StatefulTask > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945074#comment-15945074 ] Till Rohrmann commented on FLINK-5982: -- Alright, this sounds good to me :-) > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943360#comment-15943360 ] Wei-Che Wei commented on FLINK-5982: Hi [~till.rohrmann], I prefer not to pass only {{Environment}} to {{BatchTask}}. Because this task tries to merge the {{AbstractInvokable}} and {{StatefulTask}}, that will make more complexity to distinguish {{BatchTask}} or {{StreamTask}} when calling {{loadAndInstantiateInvokable()}} in {{Task}}. Make subclasses all have the same constructor can simplify the behavior of {{AbstractInvokable}} in {{Task}}. In this way, I will prefer the suggestion from [~StephanEwen]. It temporarily forces {{BatchTask}} won't get a non-null state before the {{BatchTask}} actually handles recovery state. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943286#comment-15943286 ] Stephan Ewen commented on FLINK-5982: - I think that is actually simpler to shoot yourself in the foot with "special meaning" objects like {{TaskStateHandles.EMPTY}}. For example: The object also comes via RPC, so a simple mistake in the singleton resolving code means that the {{if (state == TaskStateHandles.EMPTY)}} actually breaks the code. I personally find good null handling quite effective. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943273#comment-15943273 ] Till Rohrmann commented on FLINK-5982: -- True, we can also throw an {{IllegalStateException}}. I was just wondering why giving the user/developer the possibility to shoot himself in the foot by allowing to pass a state handles object to a {{BatchTask}}. Imo, it's safer to simply pass the {{Environment}} to the {{BatchTask}} and the call the super class's constructor with the given {{Environment}} and {{TaskStateHandles.EMPTY}}. That way we could enforce that the task state handles field is never {{null}} by checking it in the constructor. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943028#comment-15943028 ] Stephan Ewen commented on FLINK-5982: - [~till.rohrmann] Why not throw an illegal state exception in the Batch Task? I think that would be good as a safety net. I also don't see harm in having the original state handles nullable - in the end, we don't need to eliminate all nulls, we only need to eliminate "lazy initialization". Nullable fields are okay, in my opinion, if this is part of a proper contract. The problem are fields that should not be null, but are sometimes "accidentally still null" because the lazy initialization has not been completed. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942972#comment-15942972 ] Wei-Che Wei commented on FLINK-5982: Hi [~till.rohrmann] Ok, I see. Thanks for your suggestion. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942957#comment-15942957 ] Till Rohrmann commented on FLINK-5982: -- Hi [~tonywei], one of the ideas for the refactoring is to avoid the situation where fields are {{null}}. Thus, we should also set a value for tasks which don't have state assigned. We could, for example, use the {{TaskStateHandles.EMPTY}} singleton for initialization. Then we can be sure that the {{BatchTask}}, for example, won't fail with a {{NullPointerException}} related to a not set state field. Additionally, you can get rid of the {{IllegalStateException}} in the {{BatchTask}}. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15939521#comment-15939521 ] Wei-Che Wei commented on FLINK-5982: Hi [~till.rohrmann] Thanks for your replying. For the first question, I sill feel a little confused and I would like to getting your suggestion. Thank you. # It's a good choice to check passed environment in {{AbstractInvokable}} and pass a {{DummyEnvironment}} instead of {{null}}. I am not sure if it is good to state instances as well, since sometimes the state instances will be {{null}} in {{Task}}. If checking state instances are not {{null}}, it might need to add one more constructor or pass an empty state object in {{Task}}. What do you think? # The exception is {{IllegalStateException("Found operator state for a non-stateful task invokable")}}. Now, I only put it in {{BatchTask}} not it the testing classes. I think it would not change their behavior. I will check it once again before I finish this task. # What I want to do is leave it empty and let those classes who need to support the state methods to override them, but I think your way is better. I will implement like that and let the state methods in {{AbstractInvokable}} be abstract methods. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938422#comment-15938422 ] Till Rohrmann commented on FLINK-5982: -- Hi [~tonywei], I think you should check in the {{AbstractInvokable}} that the passed environment and state instances are not null. 1. If nowhere {{DummyInvokable.setEnvironment}} is called, then you can simply pass a {{DummyEnvironment}} which does nothing in the {{DummyInvokable}} constructor to the super class. For the state you can pass an empty state object. 2. What exceptions are you referring to? If you don't change the behaviour of the testing classes, then it should be ok. 3. What would the default behaviour be? Alternatively, we could think about throwing an {{UnsupportedOperationException}} in the classes which don't support the state methods. That way we are sure that they are not mistakenly used with state. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15906135#comment-15906135 ] Wei-Che Wei commented on FLINK-5982: I see. I will just focus on refactoring {{setEnvironment(env)}}, {{setInitialState(state)}} and the new constructor. Thanks for your friendly reminder. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902789#comment-15902789 ] Stephan Ewen commented on FLINK-5982: - The description is good, please go ahead with these changes. I would suggest to not move code from {{invoke()}} into the constructor. We should do that in a separate pull request. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901331#comment-15901331 ] Wei-Che Wei commented on FLINK-5982: [~StephanEwen], [~till.rohrmann] Have updated the description. Is there any suggestion? If not, I will start to work on it based on the approach described above. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901309#comment-15901309 ] Wei-Che Wei commented on FLINK-5982: If handling recovery state on {{BatchTask}} is needed in the future, it makes sense for me to assume all tasks in Flink are stateful. I will update the description. Thank you. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have > one argument constructor with {{Environment}}. > #* Remove {{setInitialState}} and create a new abstract class > {{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement > {{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument > constructor with {{Environment}} and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of only {{AbstractInvokable}} have an one argument > constructor and call the constructor in {{AbstractInvokable}}. > #* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit > {{AbstractStatefulInvokable}} and have two two argument constructor. > # Change the creation of the invokables to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901289#comment-15901289 ] Stephan Ewen commented on FLINK-5982: - I am personally for dropping the distinction between {{StateFulTask}} and {{AbstractInvokable}} and move to the assumption that all tasks in Flink are stateful (which I think will be the future). That {{BatchTask}} does not handle recovery state is actually just the current status. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have > one argument constructor with {{Environment}}. > #* Remove {{setInitialState}} and create a new abstract class > {{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement > {{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument > constructor with {{Environment}} and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of only {{AbstractInvokable}} have an one argument > constructor and call the constructor in {{AbstractInvokable}}. > #* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit > {{AbstractStatefulInvokable}} and have two two argument constructor. > # Change the creation of the invokables to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899574#comment-15899574 ] Till Rohrmann commented on FLINK-5982: -- +1 for the proposed changes :-) > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have > one argument constructor with {{Environment}}. > #* Remove {{setInitialState}} and create a new abstract class > {{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement > {{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument > constructor with {{Environment}} and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of only {{AbstractInvokable}} have an one argument > constructor and call the constructor in {{AbstractInvokable}}. > #* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit > {{AbstractStatefulInvokable}} and have two two argument constructor. > # Change the creation of the invokables to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899368#comment-15899368 ] Wei-Che Wei commented on FLINK-5982: FYI [~StephanEwen], [~till.rohrmann] Please let me know if you have any suggestions. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have > one argument constructor with {{Environment}}. > #* Remove {{setInitialState}} and create a new abstract class > {{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement > {{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument > constructor with {{Environment}} and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of only {{AbstractInvokable}} have an one argument > constructor and call the constructor in {{AbstractInvokable}}. > #* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit > {{AbstractStatefulInvokable}} and have two two argument constructor. > # Change the creation of the invokables to call that constructor, update all > the tests -- This message was sent by Atlassian JIRA (v6.3.15#6346)