[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216997#comment-16216997 ] ASF GitHub Bot commented on FLINK-7623: --- Github user pnowojski closed the pull request at: https://github.com/apache/flink/pull/4851 > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216996#comment-16216996 ] ASF GitHub Bot commented on FLINK-7623: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4851 Thanks! > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216967#comment-16216967 ] ASF GitHub Bot commented on FLINK-7623: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4851 I merged, could you please close the PR? > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216863#comment-16216863 ] ASF GitHub Bot commented on FLINK-7623: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4851 It's excellent that you separated the cleanup work from the actual change. > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216857#comment-16216857 ] ASF GitHub Bot commented on FLINK-7623: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r146551851 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java --- @@ -370,6 +371,10 @@ public void endInput() { } } + public StreamConfigChainer setupOpertorChain(OperatorID headOperatorId, OneInputStreamOperator headOperator) { --- End diff -- nit: typo, but I'll fix while merging > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212437#comment-16212437 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4851 That's great! In that case I would approve this > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212433#comment-16212433 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145921473 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212431#comment-16212431 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145921058 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AcknowledgeStreamMockEnvironment.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; + +/** + * Stream environment that allows to wait for checkpoint acknowledgement. + */ +class AcknowledgeStreamMockEnvironment extends StreamMockEnvironment { --- End diff -- `RocksDBAsyncSnapshotTest` does something very similar in anonymous class > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212429#comment-16212429 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145920720 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212426#comment-16212426 ] ASF GitHub Bot commented on FLINK-7623: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4851 It appears that current behaviour is as you wished @StefanRRichter: - Operator participated in checkpoint, data written -> `isRestored == true` - Operator participated in checkpoint, but did not receive state after rescaling -> `isRestored == true` - Operator participated in checkpoint, nothing checkpointed -> `isRestored == true` - Operator never participated in checkpoint, or has a new uid -> `isRestored == false` > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212417#comment-16212417 ] ASF GitHub Bot commented on FLINK-7623: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145903567 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212420#comment-16212420 ] ASF GitHub Bot commented on FLINK-7623: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145917932 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212419#comment-16212419 ] ASF GitHub Bot commented on FLINK-7623: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145902939 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AcknowledgeStreamMockEnvironment.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; + +/** + * Stream environment that allows to wait for checkpoint acknowledgement. + */ +class AcknowledgeStreamMockEnvironment extends StreamMockEnvironment { --- End diff -- I have found only one more usage of `StreamMockEnvironment` in `AsyncWaitOperatorTest#testStateSnapshotAndRestore`. Did you mean something more? > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212421#comment-16212421 ] ASF GitHub Bot commented on FLINK-7623: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145917513 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212422#comment-16212422 ] ASF GitHub Bot commented on FLINK-7623: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145903139 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212418#comment-16212418 ] ASF GitHub Bot commented on FLINK-7623: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145915808 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212239#comment-16212239 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145885868 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212183#comment-16212183 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4851 Also from our offline discussion, I would suggest that this behaves as: if an operator with a given ID participated in a checkpoint, it should be marked as restored. From this definition, all cases should be derived. I believe this is slightly different to the current implementation. Both make sense, so I think we should agree to something. @pnowojski @aljoscha which is the better definition for you? > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212175#comment-16212175 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145879846 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212174#comment-16212174 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145879772 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212165#comment-16212165 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145878649 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212163#comment-16212163 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145878457 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212158#comment-16212158 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145878181 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212155#comment-16212155 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145877990 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212153#comment-16212153 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145877847 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored} + * method. + */ +public class RestoreStreamTaskTest extends TestLogger { + @Test + public void testRestore() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + + assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size()); + + TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles(); + + try { + AcknowledgeStreamMockEnvironment environment2 = processRecords( + new OperatorID(42L, 42L), + new RestoreCounterOperator(), + new OperatorID(44L, 44L), + new RestoreCounterOperator(), + Optional.of(stateHandles)); + + assertEquals(2, RestoreCounterOperator.RESTORE_COUNTER.get()); + } + finally { + RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0); + } + } + + @Test + public void testRestoreWithNewId() throws Exception { + AcknowledgeStreamMockEnvironment environment1 = processRecords( + new OperatorID(42L, 42L), + new CounterOperator(), + new OperatorID(44L, 44L), + new CounterOperator(), + Optional.empty()); + +
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212145#comment-16212145 ] ASF GitHub Bot commented on FLINK-7623: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4851#discussion_r145877337 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AcknowledgeStreamMockEnvironment.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; + +/** + * Stream environment that allows to wait for checkpoint acknowledgement. + */ +class AcknowledgeStreamMockEnvironment extends StreamMockEnvironment { --- End diff -- I did a similar refactoring in one of my pending PRs, but it's ok because that one will probably not make it into 1.4. What I would still suggest, if you search for subclasses of `StreamMockEnvironment`, there are still more cases (some as anonymous classes) that could be replaced by a proper dummy like this. > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16210951#comment-16210951 ] Piotr Nowojski commented on FLINK-7623: --- As [~srichter] suggested I have checked for couple of more cases and current ({{1.4-SNAPSHOT}}) behaviour is as follows: # Operator participated in checkpoint, data written -> {{isRestored == true}} # Operator participated in checkpoint, but did not receive state after rescaling -> {{isRestored == true}} # Operator never participated in checkpoint, or has a new uid -> {{isRestored == false}} # Operator participated in checkpoint, nothing checkpointed -> {{sRestored == false}} I think that this behaviour makes sense. > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209552#comment-16209552 ] Piotr Nowojski commented on FLINK-7623: --- It seems like indeed this bug was fixed in https://issues.apache.org/jira/browse/FLINK-7213. However I would like to test if {{isRestore()}} works correctly in case of scaling up. > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209537#comment-16209537 ] ASF GitHub Bot commented on FLINK-7623: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4851 [FLINK-7623][tests] Add tests to make sure operator is never restored when using new operator id ## What is the purpose of the change This PR adds tests coverage for correct behaviour of `ManagedInitializationContext#isRestored` flag - if application is restarted and a some operator has a new `uid`, it should return false. This bug was fixed by #4353. ## Brief change log Please check commit messages for change log ## Verifying this change This PR adds `RestoreStreamTaskTest` and is not changing any productional code. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f7623 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4851.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4851 commit a99d384fcd5b021b71dc703d0e9e8063bd72f89e Author: Piotr NowojskiDate: 2017-10-18T13:02:02Z [hotfix][streaming] Fix formatting in OperatorChain commit 5a2972d5f02521166c70ad68ad3fac0df9fad2e8 Author: Piotr Nowojski Date: 2017-10-18T14:01:38Z [hotfix][tests] Add easier way to chain operator in StreamTaskTestHarness commit 26cabd7762458633be891b746d06595a873033b4 Author: Piotr Nowojski Date: 2017-10-18T15:18:19Z [hotfix][tests] Extract AcknowledgeStreamMockEnvironment commit 2d6b45a55b8c63df4635a2d5506eab4b5ab590c3 Author: Piotr Nowojski Date: 2017-10-18T13:01:37Z [FLINK-7623][tests] Add tests to make sure operator is never restored when using new operator id > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16205634#comment-16205634 ] Aljoscha Krettek commented on FLINK-7623: - Yes, it's a blocker. > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16205605#comment-16205605 ] Piotr Nowojski commented on FLINK-7623: --- I will take a look at this in this or the following week. [~aljoscha] could you confirm that this is a release blocker? > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174974#comment-16174974 ] Aljoscha Krettek commented on FLINK-7623: - I think that would be incorrect, because then it would also report "not restored" in case the operator had no state assigned but (globally) was still restored, i.e. some other parallel instances of this operator might have state. > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169987#comment-16169987 ] Chesnay Schepler commented on FLINK-7623: - The culprit is this bit in the AbstractStreamOperator: {code} public final void initializeState(OperatorSubtaskState stateHandles) throws Exception { ... boolean restoring = (null != stateHandles); ... {code} Instead this should be {{boolean restoring = stateHandles.hasState()}}; > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)