[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16062341#comment-16062341 ] ASF GitHub Bot commented on FLINK-5892: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/3844 > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16062305#comment-16062305 ] Chesnay Schepler commented on FLINK-5892: - The KeyedComplexChainTest was updated to properly use a 1.2 snapshot (at the time of merging restoration of keyed state was broken). 1.3: 0fd3683c3489d98c5c82d21b9a7a5ee93c0d6b2e 1.4: 7a0fff31d483633f88026650956b97cc451f19df > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055407#comment-16055407 ] ASF GitHub Bot commented on FLINK-5892: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3844 That's unrelated. The changes made here can only affect the `KeyedComplexChainTest`. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055278#comment-16055278 ] ASF GitHub Bot commented on FLINK-5892: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/3844 Are you sure that errors in travis are intermittent or unrelated to your change? One is already reported here: https://issues.apache.org/jira/browse/FLINK-6843 but second one I'm not sure: ``` Tests run: 10, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.933 sec <<< FAILURE! - in org.apache.flink.runtime.state.OperatorStateBackendTest testSnapshotAsyncCancel(org.apache.flink.runtime.state.OperatorStateBackendTest) Time elapsed: 0.061 sec <<< ERROR! java.util.concurrent.ExecutionException: java.io.IOException: Stream closed. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:206) at org.apache.flink.runtime.state.OperatorStateBackendTest.testSnapshotAsyncCancel(OperatorStateBackendTest.java:636) Caused by: java.io.IOException: Stream closed. at org.apache.flink.runtime.util.BlockerCheckpointStreamFactory$1.write(BlockerCheckpointStreamFactory.java:95) at java.io.DataOutputStream.writeInt(DataOutputStream.java:197) at org.apache.flink.core.io.VersionedIOReadableWritable.write(VersionedIOReadableWritable.java:40) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.write(OperatorBackendSerializationProxy.java:65) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:255) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055267#comment-16055267 ] ASF GitHub Bot commented on FLINK-5892: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/3844#discussion_r122897990 --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java --- @@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception { .map(new StatefulStringStoringMap(mode, "first")) .setParallelism(4); - // TODO: re-enable this when generating the actual 1.2 savepoint - //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { - map.uid("first"); - //} + if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { --- End diff -- Yes, thank you very much, now I get it :) I think since this code is already on the master it's a bit too late change a commit message there :( > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055242#comment-16055242 ] ASF GitHub Bot commented on FLINK-5892: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3844 yes, this is present in master, but we should have it in the 1.3 branch as well. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055241#comment-16055241 ] ASF GitHub Bot commented on FLINK-5892: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3844#discussion_r122894136 --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java --- @@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception { .map(new StatefulStringStoringMap(mode, "first")) .setParallelism(4); - // TODO: re-enable this when generating the actual 1.2 savepoint - //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { - map.uid("first"); - //} + if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { --- End diff -- You are completely right, the commit message/PR description isn't sufficient to explain what this PR changes. In fact, it took me a bit to remember that as well. I'll adjust the commit message later on. So this PR is pretty subtle, since the changes to the code aren't the interesting part, but the change to the `complexKeyed-flink1.2/_metadata` file is. This file is supposed to be a 1.2 savepoint to verify the restore behavior from them in 1.3. But this file is not a 1.2 savepoint, because at the time of merging the restoration of keyed 1.2 state was broken, In the meantime we used a 1.3 savepoint instead. The main thing this PR does is replace this 1.3 savepoint with an actual 1.2 savepoint. The second change is related to the uid's. In 1.2, it is not possible to assign UIDs to chained operators. As "first" and "second" are both chained to the window function we are not allowed to call `map.uid("...")` when generating the 1.2 savepoint (! (MIGRATE || RESTORE)). However, in 1.3 it is possible and in fact mandatory to assign UIDs. Does that clear things up? > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16054070#comment-16054070 ] ASF GitHub Bot commented on FLINK-5892: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/3844#discussion_r122718852 --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java --- @@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception { .map(new StatefulStringStoringMap(mode, "first")) .setParallelism(4); - // TODO: re-enable this when generating the actual 1.2 savepoint - //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { - map.uid("first"); - //} + if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { --- End diff -- I somehow don't like it that is not explained in the commit message what has actually changed/why was this change required at all. Especially since you have not changed anything else in the code, it is difficult to understand that. If nothing else has changed, why do we need this `if (...)`? If something has changed, shouldn't it be covered by some test? > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16025215#comment-16025215 ] ASF GitHub Bot commented on FLINK-5892: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3842 > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000542#comment-16000542 ] ASF GitHub Bot commented on FLINK-5892: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3844 [FLINK-5892] Enable 1.2 keyed state test Backport of #3842 for the 1.3 branch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 13_5892_enable_test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3844.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3844 commit 7a0fff31d483633f88026650956b97cc451f19df Author: zentolDate: 2017-05-08T09:56:57Z [FLINK-5892] Enable 1.2 keyed state test > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000539#comment-16000539 ] ASF GitHub Bot commented on FLINK-5892: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3843 [FLINK-5892] Enable 1.2 keyed state test Backport of #3842 for the 1.3 branch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 13_5892_enable_test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3843.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3843 commit 7a0fff31d483633f88026650956b97cc451f19df Author: zentolDate: 2017-05-08T09:56:57Z [FLINK-5892] Enable 1.2 keyed state test > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000538#comment-16000538 ] ASF GitHub Bot commented on FLINK-5892: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3842 [FLINK-5892] Enable 1.2 keyed state test This PR enables the 1.2 keyed state restore test added in FLINK-5892. It was temporarily disabled due to a general issue with backwards compatibility. First the utility functions in `KeyedJob` are modified to only assign UIDs when migrating and restoring, but not when generating the job. Second, we now actually use a 1.2 savepoint. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5892_enable_test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3842.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3842 commit 76edf6ae2f160527427833ae0aa94a16ab279fa0 Author: zentolDate: 2017-05-08T09:56:57Z [FLINK-5892] Enable 1.2 keyed state test > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000540#comment-16000540 ] ASF GitHub Bot commented on FLINK-5892: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/3843 > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15989374#comment-15989374 ] ASF GitHub Bot commented on FLINK-5892: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3770 > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > Fix For: 1.3.0 > > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988897#comment-15988897 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113924290 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java --- @@ -139,6 +158,10 @@ public ExecutionJobVertex( this.serializedTaskInformation = null; this.taskVertices = new ExecutionVertex[numTaskVertices]; + List opIDs = jobVertex.getOperatorIDs(); + this.operatorIDs = opIDs.toArray(new OperatorID[opIDs.size()]); --- End diff -- How about making `operatorIDs` an immutable list instead of an array. I think all the operations you perform could also run on an array list and we could enforce immutability so that nobody is tempted to modify the inner state of the original array (e.g. to reverse the element order for convenience in other parts of the code). Same for the alternative Ids. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988899#comment-15988899 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113929555 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -62,252 +69,390 @@ public StateAssignmentOperation( } public boolean assignStates() throws Exception { - - // this tracks if we find missing node hash ids and already use secondary mappings - boolean expandedToLegacyIds = false; - + MaplocalStates = new HashMap<>(taskStates); Map localTasks = this.tasks; - for (Map.Entry taskGroupStateEntry : taskStates.entrySet()) { - - TaskState taskState = taskGroupStateEntry.getValue(); - - //find vertex for state- - - ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - - // on the first time we can not find the execution job vertex for an id, we also consider alternative ids, - // for example as generated from older flink versions, to provide backwards compatibility. - if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - } + Set allOperatorIDs = new HashSet<>(); + for (ExecutionJobVertex executionJobVertex : tasks.values()) { + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs())); + } + for (Map.Entry taskGroupStateEntry : taskStates.entrySet()) { --- End diff -- This loop looks like we could factor it out into a private precondition method like `checkStateMappingCompleteness` or something like that. Even the previous loop and everything working on the hash set could go there. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988896#comment-15988896 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113922612 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Simple container class which contains the raw/managed/legacy operator state and key-group state handles for the sub + * tasks of an operator. + */ +public class OperatorState implements CompositeStateHandle { + + private static final long serialVersionUID = -4845578005863201810L; + + /** id of the operator */ + private final OperatorID operatorID; + + /** handles to non-partitioned states, subtaskindex -> subtaskstate */ + private final MapsubtaskStates; --- End diff -- here and in a few other places in this class, we could add the `operator` String to the variable names to make it clear for user that we are now dealing with state on the operator level and avoid confusing flink veterans that have a certain mental mapping for the word `(Sub)TaskState` that they must update. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988894#comment-15988894 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113928558 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -62,252 +69,390 @@ public StateAssignmentOperation( } public boolean assignStates() throws Exception { - - // this tracks if we find missing node hash ids and already use secondary mappings - boolean expandedToLegacyIds = false; - + MaplocalStates = new HashMap<>(taskStates); Map localTasks = this.tasks; - for (Map.Entry taskGroupStateEntry : taskStates.entrySet()) { - - TaskState taskState = taskGroupStateEntry.getValue(); - - //find vertex for state- - - ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - - // on the first time we can not find the execution job vertex for an id, we also consider alternative ids, - // for example as generated from older flink versions, to provide backwards compatibility. - if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - } + Set allOperatorIDs = new HashSet<>(); + for (ExecutionJobVertex executionJobVertex : tasks.values()) { + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs())); + } + for (Map.Entry taskGroupStateEntry : taskStates.entrySet()) { --- End diff -- Renaming `taskStates` and `taskGroupStateEntry` to something that has `operator` instead of `task` in it makes this more readable - maybe `operatorToStateMapping`. Just some leftover from the refactoring i guess. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988898#comment-15988898 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113920542 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1025,11 +1026,11 @@ public boolean restoreLatestCheckpointedState( LOG.info("Restoring from latest valid checkpoint: {}.", latest); // re-assign the task states - - final MaptaskStates = latest.getTaskStates(); + final Map operatorStates = latest.getOperatorStates(); StateAssignmentOperation stateAssignmentOperation = - new StateAssignmentOperation(LOG, tasks, taskStates, allowNonRestoredState); + new StateAssignmentOperation(LOG, tasks, operatorStates, allowNonRestoredState); --- End diff -- Not sure why this is implemented in a way that a logger is passed to the `StateAssignmentOperation`. I guess the class should simply have its own logger. I think this could be changed. But seems like this was introduced earlier and is unrelated to this PR. But I wouldn't to have this refactored to the normal logger scheme before we merge. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988895#comment-15988895 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113928105 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -62,252 +69,390 @@ public StateAssignmentOperation( } public boolean assignStates() throws Exception { - - // this tracks if we find missing node hash ids and already use secondary mappings - boolean expandedToLegacyIds = false; - + MaplocalStates = new HashMap<>(taskStates); Map localTasks = this.tasks; - for (Map.Entry taskGroupStateEntry : taskStates.entrySet()) { - - TaskState taskState = taskGroupStateEntry.getValue(); - - //find vertex for state- - - ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - - // on the first time we can not find the execution job vertex for an id, we also consider alternative ids, - // for example as generated from older flink versions, to provide backwards compatibility. - if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - } + Set allOperatorIDs = new HashSet<>(); + for (ExecutionJobVertex executionJobVertex : tasks.values()) { + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs())); --- End diff -- I we change to immutable list instead of array, this code also saves one converting to list > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988742#comment-15988742 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113919931 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java --- @@ -125,6 +132,8 @@ public JobVertex(String name) { public JobVertex(String name, JobVertexID id) { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart())); + this.operatorIdsAlternatives.add(null); --- End diff -- Yes, that is what I also expected. Just wanted to be really sure. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988644#comment-15988644 ] ASF GitHub Bot commented on FLINK-5892: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113903349 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java --- @@ -125,6 +132,8 @@ public JobVertex(String name) { public JobVertex(String name, JobVertexID id) { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart())); + this.operatorIdsAlternatives.add(null); --- End diff -- We only need a single alternative ID for each operatorID, the one set using `setUIDHash()`. For savepoints created with 1.3 other alternatives aren't required since 1.3 doesn't use the old hasher. 1.0-1.2 savepoints are converted to the 1.3 format using the alternative job vertex IDs (see `SavepointV2#convertToOperatorStateSavepointV2`), which is why we can't remove them. After the conversion however the are identical to a 1.3 savepoint. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988600#comment-15988600 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113898244 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java --- @@ -125,6 +132,8 @@ public JobVertex(String name) { public JobVertex(String name, JobVertexID id) { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart())); + this.operatorIdsAlternatives.add(null); --- End diff -- Ok, in this cases it really seems better to make this explicit as you suggested. Also I was wondering if `operatorIdsAlternatives` should be a List-- just want to make sure that only at most one alternative ID must be maintained per operator. But I think that we can always determine the savepoint version and only need compatibility to the hasher version that was valid under that savepoint version. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984991#comment-15984991 ] ASF GitHub Bot commented on FLINK-5892: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113482975 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java --- @@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) { * @param name The name of the new job vertex. * @param primaryId The id of the job vertex. * @param alternativeIds The alternative ids of the job vertex. +* @param operatorIds The ids of all operators contained in this job vertex. +* @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex- */ - public JobVertex(String name, JobVertexID primaryId, List alternativeIds) { + public JobVertex(String name, JobVertexID primaryId, List alternativeIds, List operatorIds, List alternativeOperatorIds) { --- End diff -- We could add a simple OperatorIDs pojo that encapsulates both lists. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984990#comment-15984990 ] ASF GitHub Bot commented on FLINK-5892: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113482888 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java --- @@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) { * @param name The name of the new job vertex. * @param primaryId The id of the job vertex. * @param alternativeIds The alternative ids of the job vertex. +* @param operatorIds The ids of all operators contained in this job vertex. +* @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex- */ - public JobVertex(String name, JobVertexID primaryId, List alternativeIds) { + public JobVertex(String name, JobVertexID primaryId, List alternativeIds, List operatorIds, List alternativeOperatorIds) { --- End diff -- We don't know at the time job JobVertex generation whether we need them, so we have to provide them eagerly. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984988#comment-15984988 ] ASF GitHub Bot commented on FLINK-5892: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113482738 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java --- @@ -125,6 +132,8 @@ public JobVertex(String name) { public JobVertex(String name, JobVertexID id) { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart())); + this.operatorIdsAlternatives.add(null); --- End diff -- There's an implicit contract that the length or `operatorIDs` must be equivalent to `operatorIdsAlternatives`. We could store them as a pair instead. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984848#comment-15984848 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113416026 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java --- @@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) { * @param name The name of the new job vertex. * @param primaryId The id of the job vertex. * @param alternativeIds The alternative ids of the job vertex. +* @param operatorIds The ids of all operators contained in this job vertex. +* @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex- */ - public JobVertex(String name, JobVertexID primaryId, List alternativeIds) { + public JobVertex(String name, JobVertexID primaryId, List alternativeIds, List operatorIds, List alternativeOperatorIds) { --- End diff -- Again, generic Type on the third parameter seems off. I also suggest to introduce line breaks to the parameter list as it is very long. On top of that, we have a lot of parameter with the same type, which callers always can mix up easily. This and the number of arguments make me wonder if it would make sense to just have the actual Ids in the constructor, plus 2 methods to provide the alternative IDs for the cases that require them? > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984844#comment-15984844 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113415296 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java --- @@ -125,6 +132,8 @@ public JobVertex(String name) { public JobVertex(String name, JobVertexID id) { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; + this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), this.id.getUpperPart())); + this.operatorIdsAlternatives.add(null); --- End diff -- Why is it required to add `null`here, which seems strange? Either this is not required or indicates some implicit contracts about `operatorIdsAlternatives` that would at least justify a comment or (even better) a change. As far as I can see, it is just not required. Or do I miss something? > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984846#comment-15984846 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113411306 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.apache.flink.util.AbstractID; + +import javax.xml.bind.DatatypeConverter; + +/** + * A class for statistically unique operator IDs. + */ +public class OperatorID extends AbstractID { + + private static final long serialVersionUID = 1L; + + public OperatorID() { + super(); + } + public OperatorID(byte[] bytes) { + super(bytes); + } + + public OperatorID(long lowerPart, long upperPart) { + super(lowerPart, upperPart); + } + + public OperatorID(AbstractID id) { + super(id); + } + + public static OperatorID fromHexString(String hexString) { --- End diff -- If my IDE is telling the truth, this method is never used and could be removed > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984845#comment-15984845 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113412507 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java --- @@ -40,6 +40,10 @@ public JobVertexID(long lowerPart, long upperPart) { super(lowerPart, upperPart); } + public JobVertexID(AbstractID id) { --- End diff -- Different subclasses of AbstractID are intended to introduce some kind of type safety. With this in mind, I feel like this is a not very transparent way of "casting" between Ids. Maybe some `convert` methods could make this a bit more explicit than offering a public constructor for this purpose? > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984849#comment-15984849 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113416299 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java --- @@ -39,8 +39,8 @@ public InputFormatVertex(String name, JobVertexID id) { super(name, id); } - public InputFormatVertex(String name, JobVertexID id, List alternativeIds) { - super(name, id, alternativeIds); + public InputFormatVertex(String name, JobVertexID id, List alternativeIds, List operatorIds, List alternativeOperatorIds) { --- End diff -- See my comments on a similar constructor in `JobVertex`. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984843#comment-15984843 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113413839 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java --- @@ -50,7 +50,14 @@ /** The ID of the vertex. */ private final JobVertexID id; - private final ArrayList idAlternatives = new ArrayList<>(); + /** The alternative IDs of the vertex. */ + private final ArrayList idAlternatives = new ArrayList<>(); --- End diff -- From the comments, this looks incorrect and I would expect a `List`here > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984847#comment-15984847 ] ASF GitHub Bot commented on FLINK-5892: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113411211 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.apache.flink.util.AbstractID; + +import javax.xml.bind.DatatypeConverter; + +/** + * A class for statistically unique operator IDs. + */ +public class OperatorID extends AbstractID { + + private static final long serialVersionUID = 1L; + + public OperatorID() { + super(); + } --- End diff -- Code style: I would introduce an empty line in between and the call to super is ok, but also not required. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983195#comment-15983195 ] ASF GitHub Bot commented on FLINK-5892: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3770 [FLINK-5892] Restore state on the operator level ## General This PR is a collaboration between @guoweiM and myself, enabling Flink to restore state on the operator level. This means that the topology of a job may change in regards to chains when restoring from a 1.3 savepoint, allowing the arbitrary addition, removal or modification of chains. The cornerstone for this is a semantic change for savepoints, no structural changes have been made to the `SavepointV0/1/2` classes or their serialized format: In 1.2 a savepoint contains the states of tasks. If a task consists of multiple operators then the stored TaskState internally contains a list of states, one entry for each operator. In 1.3 a savepoint contains the states of operators only; the notion of tasks is eliminated. If a task consists of multiple operators we store one TaskState for each operator instead. Internally they each contain a list of states with a length of 1. ## Implementation In order for this to work a number of changes had to be made. First and foremost we required a new `StateAssignmentOperation` that was aware of operators. (74881a2, 8be9c58, 4fa8bbd) Since the SAO uses the `ExecutionGraph` classes to map the restored state it was necessary to forward the IDs of all contained operators from the `StreamingJobGraphGenerator` to the `ExecutionJobVertex`. (73427c3) The `PendingCheckpoint` class had to be adjusted to conform to the new semantics; received `SubtaskStates`, containing the state of a task, are broken down into SubtaskStates for the individual operators. (f7b8ef9) ## Tests The majority of this PR are new tests (60% or so). A number of tests were added under flink-tests that test the migration path from 1.2 to 1.3. (d1efdb1) These tests first restore a job from a 1.2 savepoint, without changes to the topology, verify that the state was restored correctly and finally create a new savepoint. They then restore from this migrated 1.3 savepoint, with changes to the topology for varying scenarios, and verify the correct restoration of state again. A new test was also added to the `CheckpointCoordinatorTest` which tests the support for topology changes without executing a job. (8b5430f9) A number of existing tests had to be tweaked to run with the new changes, but these changes all boil down to extending existing mocks by a method or two. (b5430f9) ## Other changes To make it more obvious that we deal with operators and not tasks a new `OperatorID` class was introduced, and usages of `JobVertexID` in savepoint-related parts were replaced when appropriate. (fe74023) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5982_operator_state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3770.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3770 commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f Author: zentolDate: 2017-04-03T15:39:50Z [prerequisite] Disable exception when assigning uid on chained operator commit 74881a2788d034db67d99d6d32dbb2cf923aed53 Author: zentol Date: 2017-04-04T10:53:56Z [internal] Adjust SavepointLoader to new Savepoint semantics commit f7b8ef943097cd994a4ef3d5594fea4027720f5a Author: zentol Date: 2017-04-04T13:02:55Z [internal] adjust PendingCheckpoint to be in line with new semantics commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac Author: zentol Date: 2017-04-04T11:33:54Z [internal] Get operator ID's into ExecutionGraph commit 465805792932cb888393d9257fdefd828fa59343 Author: zentol Date: 2017-04-25T16:07:16Z [internals] Extract several utility methods from StateAssignmentOperation commit 008e848715b7091c3deabc9251d9d673f5506e64 Author: guowei.mgw Date: 2017-04-24T09:47:47Z [internal] Add new StateAssignmentOperation commit ffb93298ce90956b9886b3526258f6a814b7e0af Author: zentol Date: 2017-04-04T13:01:07Z [internal] Integrate new StateAssignmentOperation version commit d1efdb1c34d59f04147292b320528cd2bc838244 Author: zentol Date: 2017-04-03T15:40:21Z [tests] Add tests for chain modifications commit
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944327#comment-15944327 ] Guowei Ma commented on FLINK-5892: -- hi [~Zentol] I am discussing the proposal with [~srichter]. I think I will do it after the discussion finish. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942990#comment-15942990 ] Chesnay Schepler commented on FLINK-5892: - [~maguowei] What's your progress on this issue? > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)