[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319532#comment-16319532
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/3633
  
Thanks a lot for doing these work. Nice to see this PR be merged. =)


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318879#comment-16318879
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3633


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318396#comment-16318396
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3633
  
I finally found the time to look at this.

Have rebased it and applied some simplifications:
  - convenience constructors for stateless tasks
  - test utils that provide mock environments directly
  - Task Test Harnesses create the tasks lazily via a factory.

All in all that makes for a good patch in my opinion.

Will merge this...


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196607#comment-16196607
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/3633
  
Hi @StephanEwen 
Do you have time recently? Or can we find someone to review it if you are 
too busy to do this? Thank you.


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16087333#comment-16087333
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3633
  
Sorry about the delay. I still have this on my list...


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16077956#comment-16077956
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/3633
  
Hi @StephanEwen 
Do you have time to review this PR recently?


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16013259#comment-16013259
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/3633
  
I see. Thank you. =)


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16012419#comment-16012419
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3633
  
Sorry, I did not manage to merge this, yet. Will try to take it on after 
the 1.3 release.
Thanks for the offer to rebase that. I can ping you when I would be getting 
close to merging this, so that you could rebase it once then (rather than now 
and potentially again in a few weeks).


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011984#comment-16011984
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/3633
  
Hi @StephanEwen 

Have you been reviewing this PR? If not, I would like to rebase it and fix 
those conflicts. What do you think?


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947032#comment-15947032
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3633
  
Thanks, I will try and look at this over the next week.
It is quite a big change, so I might need a bit of time to review it 
properly...


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946503#comment-15946503
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user tony810430 commented on the issue:

https://github.com/apache/flink/pull/3633
  
@StephanEwen Thanks for the suggestion. I have updated the patch.


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945578#comment-15945578
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3633
  
Thanks for this refactoring. I had only a quick look so far, but one 
thought was that this patch could probably be simplified by making 
`triggerCheckpoint()` (and restore / notifyComplete) not abstract in the 
`AbstractInvokable`. Instead let it throw the `UnsupportedOperationException`.

That way, we avoid refactoring all the test classes to implement those 
methods (which would be nice).


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945161#comment-15945161
 ] 

ASF GitHub Bot commented on FLINK-5982:
---

GitHub user tony810430 opened a pull request:

https://github.com/apache/flink/pull/3633

[FLINK-5982] [runtime] Refactor AbstractInvokable and StatefulTask

This PR wants to merge `AbstractInvokable` and `StatefulTask` to eliminate 
"lazy initialization" in `Task`.

The following are what I did in this PR:

1. Merge `AbstractInvokable` and `StatefulTask` and use constructor to 
initialize `Environment` and `TaskStateHandles` in `AbstractInvokable`.
2. Update all subclasses and corresponding tests. For now, batch tasks are 
still not stateful, so I added `IllegalStateException()` in constructor and 
`UnsupportedOperationException()` in the methods for supporting stateful task.
3. Update the behavior of `AbstractInvokable` in `Task`, including 
`loadAndInstantiateInvokable()`, `triggerCheckpointBarrier()` and 
`notifyCheckpointComplete()`.
4. Add tests for modification in `Task` and tests the constructors' 
behavior for all concrete  subclasses which inherit `AbstractInvokable`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tony810430/flink FLINK-5982

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3633.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3633


commit 31748905eaa3b4897f0b02473526b7fa05c2304e
Author: Tony Wei 
Date:   2017-03-15T03:13:41Z

[FLINK-5982] Refactor AbstractInvokable and StatefulTask




> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-28 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945074#comment-15945074
 ] 

Till Rohrmann commented on FLINK-5982:
--

Alright, this sounds good to me :-)

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-27 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943360#comment-15943360
 ] 

Wei-Che Wei commented on FLINK-5982:


Hi [~till.rohrmann],

I prefer not to pass only {{Environment}} to {{BatchTask}}.
Because this task tries to merge the {{AbstractInvokable}} and 
{{StatefulTask}}, that will make more complexity to distinguish {{BatchTask}} 
or {{StreamTask}} when calling {{loadAndInstantiateInvokable()}} in {{Task}}.
Make subclasses all have the same constructor can simplify the behavior of 
{{AbstractInvokable}} in {{Task}}.

In this way, I will prefer the suggestion from [~StephanEwen]. It temporarily 
forces {{BatchTask}} won't get a non-null state before the {{BatchTask}} 
actually handles recovery state.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-27 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943286#comment-15943286
 ] 

Stephan Ewen commented on FLINK-5982:
-

I think that is actually simpler to shoot yourself in the foot with "special 
meaning" objects like {{TaskStateHandles.EMPTY}}.

For example: The object also comes via RPC, so a simple mistake in the 
singleton resolving code means that the {{if (state == 
TaskStateHandles.EMPTY)}} actually breaks the code. I personally find good null 
handling quite effective.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-27 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943273#comment-15943273
 ] 

Till Rohrmann commented on FLINK-5982:
--

True, we can also throw an {{IllegalStateException}}. I was just wondering why 
giving the user/developer the possibility to shoot himself in the foot by 
allowing to pass a state handles object to a {{BatchTask}}. Imo, it's safer to 
simply pass the {{Environment}} to the {{BatchTask}} and the call the super 
class's constructor with the given {{Environment}} and 
{{TaskStateHandles.EMPTY}}. That way we could enforce that the task state 
handles field is never {{null}} by checking it in the constructor.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-27 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943028#comment-15943028
 ] 

Stephan Ewen commented on FLINK-5982:
-

[~till.rohrmann] Why not throw an illegal state exception in the Batch Task? I 
think that would be good as a safety net. I also don't see harm in having the 
original state handles nullable - in the end, we don't need to eliminate all 
nulls, we only need to eliminate "lazy initialization".

Nullable fields are okay, in my opinion, if this is part of a proper contract. 
The problem are fields that should not be null, but are sometimes "accidentally 
still null" because the lazy initialization has not been completed.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-27 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942972#comment-15942972
 ] 

Wei-Che Wei commented on FLINK-5982:


Hi [~till.rohrmann]

Ok, I see. Thanks for your suggestion.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-27 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942957#comment-15942957
 ] 

Till Rohrmann commented on FLINK-5982:
--

Hi [~tonywei],

one of the ideas for the refactoring is to avoid the situation where fields are 
{{null}}. Thus, we should also set a value for tasks which don't have state 
assigned. We could, for example, use the {{TaskStateHandles.EMPTY}} singleton 
for initialization. Then we can be sure that the {{BatchTask}}, for example, 
won't fail with a {{NullPointerException}} related to a not set state field. 
Additionally, you can get rid of the {{IllegalStateException}} in the 
{{BatchTask}}.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-23 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15939521#comment-15939521
 ] 

Wei-Che Wei commented on FLINK-5982:


Hi [~till.rohrmann]

Thanks for your replying. For the first question, I sill feel a little confused 
and I would like to getting your suggestion. Thank you.

# It's a good choice to check passed environment in {{AbstractInvokable}} and 
pass a {{DummyEnvironment}} instead of {{null}}. I am not sure if it is good to 
state instances as well, since sometimes the state instances will be {{null}} 
in {{Task}}. If checking state instances are not {{null}}, it might need to add 
one more constructor or pass an empty state object in {{Task}}. What do you 
think?
# The exception is {{IllegalStateException("Found operator state for a 
non-stateful task invokable")}}. Now, I only put it in {{BatchTask}} not it the 
testing classes. I think it would not change their behavior. I will check it 
once again before I finish this task.
# What I want to do is leave it empty and let those classes who need to support 
the state methods to override them, but I think your way is better. I will 
implement like that and let the state methods in {{AbstractInvokable}} be 
abstract methods.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-23 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938422#comment-15938422
 ] 

Till Rohrmann commented on FLINK-5982:
--

Hi [~tonywei],

I think you should check in the {{AbstractInvokable}} that the passed 
environment and state instances are not null. 

1. If nowhere {{DummyInvokable.setEnvironment}} is called, then you can simply 
pass a {{DummyEnvironment}} which does nothing in the {{DummyInvokable}} 
constructor to the super class. For the state you can pass an empty state 
object.
2. What exceptions are you referring to? If you don't change the behaviour of 
the testing classes, then it should be ok.
3. What would the default behaviour be? Alternatively, we could think about 
throwing an {{UnsupportedOperationException}} in the classes which don't 
support the state methods. That way we are sure that they are not mistakenly 
used with state.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-11 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15906135#comment-15906135
 ] 

Wei-Che Wei commented on FLINK-5982:


I see. I will just focus on refactoring {{setEnvironment(env)}}, 
{{setInitialState(state)}} and the new constructor. Thanks for your friendly 
reminder.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-09 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902789#comment-15902789
 ] 

Stephan Ewen commented on FLINK-5982:
-

The description is good, please go ahead with these changes.

I would suggest to not move code from {{invoke()}} into the constructor. We 
should do that in a separate pull request.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-08 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901331#comment-15901331
 ] 

Wei-Che Wei commented on FLINK-5982:


[~StephanEwen], [~till.rohrmann]
Have updated the description. Is there any suggestion? If not, I will start to 
work on it based on the approach described above.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-08 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901309#comment-15901309
 ] 

Wei-Che Wei commented on FLINK-5982:


If handling recovery state on {{BatchTask}} is needed in the future, it makes 
sense for me to assume all tasks in Flink are stateful. I will update the 
description. Thank you.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have 
> one argument constructor with {{Environment}}.
> #* Remove {{setInitialState}} and create a new abstract class 
> {{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement 
> {{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument 
> constructor with {{Environment}} and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of only {{AbstractInvokable}} have an one argument 
> constructor and call the constructor in {{AbstractInvokable}}.
> #* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit 
> {{AbstractStatefulInvokable}} and have two two argument constructor.
> # Change the creation of the invokables to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-08 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901289#comment-15901289
 ] 

Stephan Ewen commented on FLINK-5982:
-

I am personally for dropping the distinction between {{StateFulTask}} and 
{{AbstractInvokable}} and move to the assumption that all tasks in Flink are 
stateful (which I think will be the future).
That {{BatchTask}} does not handle recovery state is actually just the current 
status.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have 
> one argument constructor with {{Environment}}.
> #* Remove {{setInitialState}} and create a new abstract class 
> {{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement 
> {{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument 
> constructor with {{Environment}} and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of only {{AbstractInvokable}} have an one argument 
> constructor and call the constructor in {{AbstractInvokable}}.
> #* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit 
> {{AbstractStatefulInvokable}} and have two two argument constructor.
> # Change the creation of the invokables to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-07 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899574#comment-15899574
 ] 

Till Rohrmann commented on FLINK-5982:
--

+1 for the proposed changes :-)

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have 
> one argument constructor with {{Environment}}.
> #* Remove {{setInitialState}} and create a new abstract class 
> {{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement 
> {{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument 
> constructor with {{Environment}} and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of only {{AbstractInvokable}} have an one argument 
> constructor and call the constructor in {{AbstractInvokable}}.
> #* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit 
> {{AbstractStatefulInvokable}} and have two two argument constructor.
> # Change the creation of the invokables to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-07 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899368#comment-15899368
 ] 

Wei-Che Wei commented on FLINK-5982:


FYI [~StephanEwen], [~till.rohrmann]

Please let me know if you have any suggestions.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have 
> one argument constructor with {{Environment}}.
> #* Remove {{setInitialState}} and create a new abstract class 
> {{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement 
> {{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument 
> constructor with {{Environment}} and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of only {{AbstractInvokable}} have an one argument 
> constructor and call the constructor in {{AbstractInvokable}}.
> #* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit 
> {{AbstractStatefulInvokable}} and have two two argument constructor.
> # Change the creation of the invokables to call that constructor, update all 
> the tests



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)