[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-06-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16062341#comment-16062341
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/3844


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-06-25 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16062305#comment-16062305
 ] 

Chesnay Schepler commented on FLINK-5892:
-

The KeyedComplexChainTest was updated to properly use a 1.2 snapshot (at the 
time of merging restoration of keyed state was broken).

1.3: 0fd3683c3489d98c5c82d21b9a7a5ee93c0d6b2e
1.4: 7a0fff31d483633f88026650956b97cc451f19df

> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055407#comment-16055407
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3844
  
That's unrelated. The changes made here can only affect the 
`KeyedComplexChainTest`.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055278#comment-16055278
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/3844
  
Are you sure that errors in travis are intermittent or unrelated to your 
change? One is already reported here: 
https://issues.apache.org/jira/browse/FLINK-6843 but second one I'm not sure:

```
Tests run: 10, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.933 sec 
<<< FAILURE! - in org.apache.flink.runtime.state.OperatorStateBackendTest

testSnapshotAsyncCancel(org.apache.flink.runtime.state.OperatorStateBackendTest)
  Time elapsed: 0.061 sec  <<< ERROR!
java.util.concurrent.ExecutionException: java.io.IOException: Stream closed.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
at 
org.apache.flink.runtime.state.OperatorStateBackendTest.testSnapshotAsyncCancel(OperatorStateBackendTest.java:636)
Caused by: java.io.IOException: Stream closed.
at 
org.apache.flink.runtime.util.BlockerCheckpointStreamFactory$1.write(BlockerCheckpointStreamFactory.java:95)
at java.io.DataOutputStream.writeInt(DataOutputStream.java:197)
at 
org.apache.flink.core.io.VersionedIOReadableWritable.write(VersionedIOReadableWritable.java:40)
at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.write(OperatorBackendSerializationProxy.java:65)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:255)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233)
at 
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055267#comment-16055267
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/3844#discussion_r122897990
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
 ---
@@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception {
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4);
 
-   // TODO: re-enable this when generating the actual 1.2 savepoint
-   //if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
-   map.uid("first");
-   //}
+   if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
--- End diff --

Yes, thank you very much, now I get it :)

I think since this code is already on the master it's a bit too late change 
a commit message there :(


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055242#comment-16055242
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3844
  
yes, this is present in master, but we should have it in the 1.3 branch as 
well.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055241#comment-16055241
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3844#discussion_r122894136
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
 ---
@@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception {
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4);
 
-   // TODO: re-enable this when generating the actual 1.2 savepoint
-   //if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
-   map.uid("first");
-   //}
+   if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
--- End diff --

You are completely right, the commit message/PR description isn't 
sufficient to explain what this PR changes. In fact, it took me a bit to 
remember that as well. I'll adjust the commit message later on.

So this PR is pretty subtle, since the changes to the code aren't the 
interesting part, but the change to the `complexKeyed-flink1.2/_metadata` file 
is. This file is supposed to be a 1.2 savepoint to verify the restore behavior 
from them in 1.3. But this file is not a 1.2 savepoint, because at the time of 
merging the restoration of keyed 1.2 state was broken, In the meantime we used 
a 1.3 savepoint instead.

The main thing this PR does is replace this 1.3 savepoint with an actual 
1.2 savepoint.

The second change is related to the uid's. In 1.2, it is not possible to 
assign UIDs to chained operators. As "first" and "second" are both chained to 
the window function we are not allowed to call `map.uid("...")` when generating 
the 1.2 savepoint (! (MIGRATE || RESTORE)). However, in 1.3 it is possible and 
in fact mandatory to assign UIDs.

Does that clear things up?


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16054070#comment-16054070
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/3844#discussion_r122718852
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
 ---
@@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception {
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4);
 
-   // TODO: re-enable this when generating the actual 1.2 savepoint
-   //if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
-   map.uid("first");
-   //}
+   if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
--- End diff --

I somehow don't like it that is not explained in the commit message what 
has actually changed/why was this change required at all. Especially since you 
have not changed anything else in the code, it is difficult to understand that. 
 If nothing else has changed, why do we need this `if (...)`? If something has 
changed, shouldn't it be covered by some test?


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16025215#comment-16025215
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3842


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000542#comment-16000542
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3844

[FLINK-5892] Enable 1.2 keyed state test

Backport of #3842 for the 1.3 branch.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 13_5892_enable_test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3844


commit 7a0fff31d483633f88026650956b97cc451f19df
Author: zentol 
Date:   2017-05-08T09:56:57Z

[FLINK-5892] Enable 1.2 keyed state test




> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000539#comment-16000539
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3843

[FLINK-5892] Enable 1.2 keyed state test

Backport of #3842 for the 1.3 branch.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 13_5892_enable_test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3843.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3843


commit 7a0fff31d483633f88026650956b97cc451f19df
Author: zentol 
Date:   2017-05-08T09:56:57Z

[FLINK-5892] Enable 1.2 keyed state test




> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000538#comment-16000538
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3842

[FLINK-5892] Enable 1.2 keyed state test

This PR enables the 1.2 keyed state restore test added in FLINK-5892. It 
was temporarily disabled due to a general issue with backwards compatibility.

First the utility functions in `KeyedJob` are modified to only assign UIDs 
when migrating and restoring, but not when generating the job.

Second, we now actually use a 1.2 savepoint.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5892_enable_test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3842.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3842


commit 76edf6ae2f160527427833ae0aa94a16ab279fa0
Author: zentol 
Date:   2017-05-08T09:56:57Z

[FLINK-5892] Enable 1.2 keyed state test




> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000540#comment-16000540
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/3843


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15989374#comment-15989374
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3770


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
> Fix For: 1.3.0
>
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988897#comment-15988897
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113924290
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -139,6 +158,10 @@ public ExecutionJobVertex(
this.serializedTaskInformation = null;
 
this.taskVertices = new ExecutionVertex[numTaskVertices];
+   List opIDs = jobVertex.getOperatorIDs();
+   this.operatorIDs = opIDs.toArray(new OperatorID[opIDs.size()]);
--- End diff --

How about making `operatorIDs` an immutable list instead of an array. I 
think all the operations you perform could also run on an array list and we 
could enforce immutability so that nobody is tempted to modify the inner state 
of the original array (e.g. to reverse the element order for convenience in 
other parts of the code). Same for the alternative Ids.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988899#comment-15988899
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113929555
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -62,252 +69,390 @@ public StateAssignmentOperation(
}
 
public boolean assignStates() throws Exception {
-
-   // this tracks if we find missing node hash ids and already use 
secondary mappings
-   boolean expandedToLegacyIds = false;
-
+   Map localStates = new 
HashMap<>(taskStates);
Map localTasks = this.tasks;
 
-   for (Map.Entry taskGroupStateEntry : 
taskStates.entrySet()) {
-
-   TaskState taskState = taskGroupStateEntry.getValue();
-
-   //find vertex 
for state-
-
-   ExecutionJobVertex executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
-
-   // on the first time we can not find the execution job 
vertex for an id, we also consider alternative ids,
-   // for example as generated from older flink versions, 
to provide backwards compatibility.
-   if (executionJobVertex == null && !expandedToLegacyIds) 
{
-   localTasks = 
ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks);
-   executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
-   expandedToLegacyIds = true;
-   logger.info("Could not find ExecutionJobVertex. 
Including legacy JobVertexIDs in search.");
-   }
+   Set allOperatorIDs = new HashSet<>();
+   for (ExecutionJobVertex executionJobVertex : tasks.values()) {
+   
allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));
+   }
+   for (Map.Entry taskGroupStateEntry : 
taskStates.entrySet()) {
--- End diff --

This loop looks like we could factor it out into a private precondition 
method like `checkStateMappingCompleteness` or something like that. Even the 
previous loop and everything working on the hash set could go there.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988896#comment-15988896
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113922612
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Simple container class which contains the raw/managed/legacy operator 
state and key-group state handles for the sub
+ * tasks of an operator.
+ */
+public class OperatorState implements CompositeStateHandle {
+
+   private static final long serialVersionUID = -4845578005863201810L;
+
+   /** id of the operator */
+   private final OperatorID operatorID;
+
+   /** handles to non-partitioned states, subtaskindex -> subtaskstate */
+   private final Map subtaskStates;
--- End diff --

here and in a few other places in this class, we could add the `operator` 
String to   the variable names to make it clear for user that we are now 
dealing with state on the operator level and avoid confusing flink veterans 
that have a certain mental mapping for the word `(Sub)TaskState` that they must 
update.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988894#comment-15988894
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113928558
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -62,252 +69,390 @@ public StateAssignmentOperation(
}
 
public boolean assignStates() throws Exception {
-
-   // this tracks if we find missing node hash ids and already use 
secondary mappings
-   boolean expandedToLegacyIds = false;
-
+   Map localStates = new 
HashMap<>(taskStates);
Map localTasks = this.tasks;
 
-   for (Map.Entry taskGroupStateEntry : 
taskStates.entrySet()) {
-
-   TaskState taskState = taskGroupStateEntry.getValue();
-
-   //find vertex 
for state-
-
-   ExecutionJobVertex executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
-
-   // on the first time we can not find the execution job 
vertex for an id, we also consider alternative ids,
-   // for example as generated from older flink versions, 
to provide backwards compatibility.
-   if (executionJobVertex == null && !expandedToLegacyIds) 
{
-   localTasks = 
ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks);
-   executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
-   expandedToLegacyIds = true;
-   logger.info("Could not find ExecutionJobVertex. 
Including legacy JobVertexIDs in search.");
-   }
+   Set allOperatorIDs = new HashSet<>();
+   for (ExecutionJobVertex executionJobVertex : tasks.values()) {
+   
allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));
+   }
+   for (Map.Entry taskGroupStateEntry : 
taskStates.entrySet()) {
--- End diff --

Renaming `taskStates` and `taskGroupStateEntry` to something that has 
`operator` instead of `task` in it makes this more readable -  maybe 
`operatorToStateMapping`. Just some leftover from the refactoring i guess.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988898#comment-15988898
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113920542
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1025,11 +1026,11 @@ public boolean restoreLatestCheckpointedState(
LOG.info("Restoring from latest valid checkpoint: {}.", 
latest);
 
// re-assign the task states
-
-   final Map taskStates = 
latest.getTaskStates();
+   final Map operatorStates = 
latest.getOperatorStates();
 
StateAssignmentOperation stateAssignmentOperation =
-   new StateAssignmentOperation(LOG, 
tasks, taskStates, allowNonRestoredState);
+   new StateAssignmentOperation(LOG, 
tasks, operatorStates, allowNonRestoredState);
--- End diff --

Not sure why this is implemented in a way that a logger is passed to the 
`StateAssignmentOperation`. I guess the class should simply have its own 
logger. I think this could be changed. But seems like this was introduced 
earlier and is unrelated to this PR. But I wouldn't to have this refactored to 
the normal logger scheme before we merge.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988895#comment-15988895
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113928105
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -62,252 +69,390 @@ public StateAssignmentOperation(
}
 
public boolean assignStates() throws Exception {
-
-   // this tracks if we find missing node hash ids and already use 
secondary mappings
-   boolean expandedToLegacyIds = false;
-
+   Map localStates = new 
HashMap<>(taskStates);
Map localTasks = this.tasks;
 
-   for (Map.Entry taskGroupStateEntry : 
taskStates.entrySet()) {
-
-   TaskState taskState = taskGroupStateEntry.getValue();
-
-   //find vertex 
for state-
-
-   ExecutionJobVertex executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
-
-   // on the first time we can not find the execution job 
vertex for an id, we also consider alternative ids,
-   // for example as generated from older flink versions, 
to provide backwards compatibility.
-   if (executionJobVertex == null && !expandedToLegacyIds) 
{
-   localTasks = 
ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks);
-   executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
-   expandedToLegacyIds = true;
-   logger.info("Could not find ExecutionJobVertex. 
Including legacy JobVertexIDs in search.");
-   }
+   Set allOperatorIDs = new HashSet<>();
+   for (ExecutionJobVertex executionJobVertex : tasks.values()) {
+   
allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));
--- End diff --

I we change to immutable list instead of array, this code also saves one 
converting to list


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988742#comment-15988742
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113919931
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -125,6 +132,8 @@ public JobVertex(String name) {
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
+   this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), 
this.id.getUpperPart()));
+   this.operatorIdsAlternatives.add(null);
--- End diff --

Yes, that is what I also expected. Just wanted to be really sure.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988644#comment-15988644
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113903349
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -125,6 +132,8 @@ public JobVertex(String name) {
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
+   this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), 
this.id.getUpperPart()));
+   this.operatorIdsAlternatives.add(null);
--- End diff --

We only need a single alternative ID for each operatorID, the one set using 
`setUIDHash()`.

For savepoints created with 1.3 other alternatives aren't required since 
1.3 doesn't use the old hasher.

1.0-1.2 savepoints are converted to the 1.3 format using the alternative 
job vertex IDs (see `SavepointV2#convertToOperatorStateSavepointV2`), which is 
why we can't remove them. After the conversion however the are identical to a 
1.3 savepoint.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988600#comment-15988600
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113898244
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -125,6 +132,8 @@ public JobVertex(String name) {
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
+   this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), 
this.id.getUpperPart()));
+   this.operatorIdsAlternatives.add(null);
--- End diff --

Ok, in this cases it really seems better to make this explicit as you 
suggested. Also I was wondering if `operatorIdsAlternatives` should be a 
List -- just want to make sure that only at most one 
alternative ID must be maintained per operator. But I think that we can always 
determine the savepoint version and only need compatibility to the hasher 
version that was valid under that savepoint version. 


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984991#comment-15984991
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113482975
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) {
 * @param name The name of the new job vertex.
 * @param primaryId The id of the job vertex.
 * @param alternativeIds The alternative ids of the job vertex.
+* @param operatorIds The ids of all operators contained in this job 
vertex.
+* @param alternativeOperatorIds The alternative ids of all operators 
contained in this job vertex-
 */
-   public JobVertex(String name, JobVertexID primaryId, List 
alternativeIds) {
+   public JobVertex(String name, JobVertexID primaryId, List 
alternativeIds, List operatorIds, List 
alternativeOperatorIds) {
--- End diff --

We could add a simple OperatorIDs pojo that encapsulates both lists.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984990#comment-15984990
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113482888
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) {
 * @param name The name of the new job vertex.
 * @param primaryId The id of the job vertex.
 * @param alternativeIds The alternative ids of the job vertex.
+* @param operatorIds The ids of all operators contained in this job 
vertex.
+* @param alternativeOperatorIds The alternative ids of all operators 
contained in this job vertex-
 */
-   public JobVertex(String name, JobVertexID primaryId, List 
alternativeIds) {
+   public JobVertex(String name, JobVertexID primaryId, List 
alternativeIds, List operatorIds, List 
alternativeOperatorIds) {
--- End diff --

We don't know at the time job JobVertex generation whether we need them, so 
we have to provide them eagerly.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984988#comment-15984988
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113482738
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -125,6 +132,8 @@ public JobVertex(String name) {
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
+   this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), 
this.id.getUpperPart()));
+   this.operatorIdsAlternatives.add(null);
--- End diff --

There's an implicit contract that the length or `operatorIDs` must be 
equivalent to `operatorIdsAlternatives`. We could store them as a pair instead.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984848#comment-15984848
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113416026
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) {
 * @param name The name of the new job vertex.
 * @param primaryId The id of the job vertex.
 * @param alternativeIds The alternative ids of the job vertex.
+* @param operatorIds The ids of all operators contained in this job 
vertex.
+* @param alternativeOperatorIds The alternative ids of all operators 
contained in this job vertex-
 */
-   public JobVertex(String name, JobVertexID primaryId, List 
alternativeIds) {
+   public JobVertex(String name, JobVertexID primaryId, List 
alternativeIds, List operatorIds, List 
alternativeOperatorIds) {
--- End diff --

Again, generic Type on the third parameter seems off. I also suggest to 
introduce line breaks to the parameter list as it is very long. On top of that, 
we have a lot of parameter with the same type, which callers always can mix up 
easily. This and the number of arguments make me wonder if it would make sense 
to just have the actual Ids in the constructor, plus 2 methods to provide the 
alternative IDs for the cases that require them?


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984844#comment-15984844
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113415296
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -125,6 +132,8 @@ public JobVertex(String name) {
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
+   this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), 
this.id.getUpperPart()));
+   this.operatorIdsAlternatives.add(null);
--- End diff --

Why is it required to add `null`here, which seems strange? Either this is 
not required or indicates some implicit contracts about 
`operatorIdsAlternatives` that would at least justify a comment or (even 
better) a change. As far as I can see, it is just not required. Or do I miss 
something?


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984846#comment-15984846
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113411306
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java 
---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.util.AbstractID;
+
+import javax.xml.bind.DatatypeConverter;
+
+/**
+ * A class for statistically unique operator IDs.
+ */
+public class OperatorID extends AbstractID {
+
+   private static final long serialVersionUID = 1L;
+
+   public OperatorID() {
+   super();
+   }
+   public OperatorID(byte[] bytes) {
+   super(bytes);
+   }
+
+   public OperatorID(long lowerPart, long upperPart) {
+   super(lowerPart, upperPart);
+   }
+
+   public OperatorID(AbstractID id) {
+   super(id);
+   }
+
+   public static OperatorID fromHexString(String hexString) {
--- End diff --

If my IDE is telling the truth, this method is never used and could be 
removed


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984845#comment-15984845
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113412507
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java 
---
@@ -40,6 +40,10 @@ public JobVertexID(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}
 
+   public JobVertexID(AbstractID id) {
--- End diff --

Different subclasses of AbstractID are intended to introduce some kind of 
type safety. With this in mind, I feel like this is a not very transparent way 
of "casting" between Ids. Maybe some `convert` methods could make this a bit 
more explicit than offering a public constructor for this purpose?


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984849#comment-15984849
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113416299
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
 ---
@@ -39,8 +39,8 @@ public InputFormatVertex(String name, JobVertexID id) {
super(name, id);
}
 
-   public InputFormatVertex(String name, JobVertexID id, List 
alternativeIds) {
-   super(name, id, alternativeIds);
+   public InputFormatVertex(String name, JobVertexID id, List 
alternativeIds, List operatorIds, List 
alternativeOperatorIds) {
--- End diff --

See my comments on a similar constructor in `JobVertex`.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984843#comment-15984843
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113413839
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -50,7 +50,14 @@
/** The ID of the vertex. */
private final JobVertexID id;
 
-   private final ArrayList idAlternatives = new ArrayList<>();
+   /** The alternative IDs of the vertex. */
+   private final ArrayList idAlternatives = new ArrayList<>();
--- End diff --

From the comments, this looks incorrect and I would expect a 
`List`here


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984847#comment-15984847
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113411211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java 
---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.util.AbstractID;
+
+import javax.xml.bind.DatatypeConverter;
+
+/**
+ * A class for statistically unique operator IDs.
+ */
+public class OperatorID extends AbstractID {
+
+   private static final long serialVersionUID = 1L;
+
+   public OperatorID() {
+   super();
+   }
--- End diff --

Code style: I would introduce an empty line in between and the call to 
super is ok, but also not required.


> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983195#comment-15983195
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3770

[FLINK-5892] Restore state on the operator level

## General
This PR is a collaboration between @guoweiM and myself, enabling Flink to 
restore state on the operator level. This means that the topology of a job may 
change in regards to chains when restoring from a 1.3 savepoint, allowing the 
arbitrary addition, removal or modification of chains.

The cornerstone for this is a semantic change for savepoints, no structural 
changes have been made to the `SavepointV0/1/2` classes or their serialized 
format:

In 1.2 a savepoint contains the states of tasks. If a task consists of 
multiple operators then the stored TaskState internally contains a list of 
states, one entry for each operator.

In 1.3 a savepoint contains the states of operators only; the notion of 
tasks is eliminated. If a task consists of multiple operators we store one 
TaskState for each operator instead. Internally they each contain a list of 
states with a length of 1.

## Implementation

In order for this to work a number of changes had to be made.

First and foremost we required a new `StateAssignmentOperation` that was 
aware of operators.
(74881a2, 8be9c58, 4fa8bbd)

Since the SAO uses the `ExecutionGraph` classes to map the restored state 
it was necessary to forward the IDs of all contained operators from the 
`StreamingJobGraphGenerator` to the `ExecutionJobVertex`.
(73427c3)

The `PendingCheckpoint` class had to be adjusted to conform to the new 
semantics; received `SubtaskStates`, containing the state of a task, are broken 
down into SubtaskStates for the individual operators.
(f7b8ef9)

## Tests

The majority of this PR are new tests (60% or so).

A number of tests were added under flink-tests that test the migration path 
from 1.2 to 1.3.
(d1efdb1)

These tests first restore a job from a 1.2 savepoint, without changes to 
the topology, verify that the state was restored correctly and finally create a 
new savepoint. They then restore from this migrated 1.3 savepoint, with changes 
to the topology for varying scenarios, and verify the correct restoration of 
state again.

A new test was also added to the `CheckpointCoordinatorTest` which tests 
the support for topology changes without executing a job.
(8b5430f9)

A number of existing tests had to be tweaked to run with the new changes, 
but these changes all boil down to extending existing mocks by a method or two.
(b5430f9)

## Other changes

To make it more obvious that we deal with operators and not tasks a new 
`OperatorID` class was introduced, and usages of `JobVertexID` in 
savepoint-related parts were replaced when appropriate.
(fe74023)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5982_operator_state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3770


commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f
Author: zentol 
Date:   2017-04-03T15:39:50Z

[prerequisite] Disable exception when assigning uid on chained operator

commit 74881a2788d034db67d99d6d32dbb2cf923aed53
Author: zentol 
Date:   2017-04-04T10:53:56Z

[internal] Adjust SavepointLoader to new Savepoint semantics

commit f7b8ef943097cd994a4ef3d5594fea4027720f5a
Author: zentol 
Date:   2017-04-04T13:02:55Z

[internal] adjust PendingCheckpoint to be in line with new semantics

commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac
Author: zentol 
Date:   2017-04-04T11:33:54Z

[internal] Get operator ID's into ExecutionGraph

commit 465805792932cb888393d9257fdefd828fa59343
Author: zentol 
Date:   2017-04-25T16:07:16Z

[internals] Extract several utility methods from StateAssignmentOperation

commit 008e848715b7091c3deabc9251d9d673f5506e64
Author: guowei.mgw 
Date:   2017-04-24T09:47:47Z

[internal] Add new StateAssignmentOperation

commit ffb93298ce90956b9886b3526258f6a814b7e0af
Author: zentol 
Date:   2017-04-04T13:01:07Z

[internal] Integrate new StateAssignmentOperation version

commit d1efdb1c34d59f04147292b320528cd2bc838244
Author: zentol 
Date:   2017-04-03T15:40:21Z

[tests] Add tests for chain modifications

commit 

[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-03-27 Thread Guowei Ma (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944327#comment-15944327
 ] 

Guowei Ma commented on FLINK-5892:
--

hi [~Zentol]
I am discussing the proposal with [~srichter].
I think I will do it after the discussion finish.

> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-03-27 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942990#comment-15942990
 ] 

Chesnay Schepler commented on FLINK-5892:
-

[~maguowei] What's your progress on this issue?

> Recover job state at the granularity of operator
> 
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Guowei Ma
>Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)