[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216997#comment-16216997
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4851


> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216996#comment-16216996
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4851
  
Thanks!


> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216967#comment-16216967
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4851
  
I merged, could you please close the PR?


> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216863#comment-16216863
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4851
  
It's excellent that you separated the cleanup work from the actual change. 
 


> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216857#comment-16216857
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r146551851
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 ---
@@ -370,6 +371,10 @@ public void endInput() {
}
}
 
+   public StreamConfigChainer setupOpertorChain(OperatorID headOperatorId, 
OneInputStreamOperator headOperator) {
--- End diff --

nit: typo, but I'll fix while merging


> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212437#comment-16212437
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4851
  
That's great! In that case I would approve this  


> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212433#comment-16212433
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145921473
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212431#comment-16212431
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145921058
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AcknowledgeStreamMockEnvironment.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+
+/**
+ * Stream environment that allows to wait for checkpoint acknowledgement.
+ */
+class AcknowledgeStreamMockEnvironment extends StreamMockEnvironment {
--- End diff --

`RocksDBAsyncSnapshotTest` does something very similar in anonymous class


> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212429#comment-16212429
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145920720
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212426#comment-16212426
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4851
  
It appears that current behaviour is as you wished @StefanRRichter:

- Operator participated in checkpoint, data written -> `isRestored == true`
- Operator participated in checkpoint, but did not receive state after 
rescaling -> `isRestored == true`
- Operator participated in checkpoint, nothing checkpointed -> `isRestored 
== true`
- Operator never participated in checkpoint, or has a new uid -> 
`isRestored == false`



> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212417#comment-16212417
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145903567
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+ 

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212420#comment-16212420
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145917932
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+ 

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212419#comment-16212419
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145902939
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AcknowledgeStreamMockEnvironment.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+
+/**
+ * Stream environment that allows to wait for checkpoint acknowledgement.
+ */
+class AcknowledgeStreamMockEnvironment extends StreamMockEnvironment {
--- End diff --

I have found only one more usage of `StreamMockEnvironment` in 
`AsyncWaitOperatorTest#testStateSnapshotAndRestore`.  Did you mean something 
more?




> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212421#comment-16212421
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145917513
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+ 

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212422#comment-16212422
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145903139
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+ 

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212418#comment-16212418
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145915808
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+ 

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212239#comment-16212239
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145885868
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212183#comment-16212183
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4851
  
Also from our offline discussion, I would suggest that this behaves as: if 
an operator with a given ID participated in a checkpoint, it should be marked 
as restored. From this definition, all cases should be derived. I believe this 
is slightly different to the current implementation. Both make sense, so I 
think we should agree to something. @pnowojski @aljoscha which is the better 
definition for you?


> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212175#comment-16212175
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145879846
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212174#comment-16212174
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145879772
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212165#comment-16212165
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145878649
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212163#comment-16212163
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145878457
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212158#comment-16212158
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145878181
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212155#comment-16212155
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145877990
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212153#comment-16212153
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145877847
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212145#comment-16212145
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145877337
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AcknowledgeStreamMockEnvironment.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+
+/**
+ * Stream environment that allows to wait for checkpoint acknowledgement.
+ */
+class AcknowledgeStreamMockEnvironment extends StreamMockEnvironment {
--- End diff --

I did a similar refactoring in one of my pending PRs, but it's ok because 
that one will probably not make it into 1.4. What I would still suggest, if you 
search for subclasses of `StreamMockEnvironment`, there are still more cases 
(some as anonymous classes) that could be replaced by a proper dummy like this.


> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16210951#comment-16210951
 ] 

Piotr Nowojski commented on FLINK-7623:
---

As [~srichter] suggested I have checked for couple of more cases and current 
({{1.4-SNAPSHOT}}) behaviour is as follows:
# Operator participated in checkpoint, data written -> {{isRestored == true}}
# Operator participated in checkpoint, but did not receive state after 
rescaling -> {{isRestored == true}}
# Operator never participated in checkpoint, or has a new uid -> {{isRestored 
== false}}
# Operator participated in checkpoint, nothing checkpointed -> {{sRestored == 
false}}

I think that this behaviour makes sense.

> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-18 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209552#comment-16209552
 ] 

Piotr Nowojski commented on FLINK-7623:
---

It seems like indeed this bug was fixed in 
https://issues.apache.org/jira/browse/FLINK-7213. However I would like to test 
if {{isRestore()}} works correctly in case of scaling up.

> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209537#comment-16209537
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4851

[FLINK-7623][tests] Add tests to make sure operator is never restored when 
using new operator id

## What is the purpose of the change

This PR adds tests coverage for correct behaviour of 
`ManagedInitializationContext#isRestored` flag - if application is restarted 
and a some operator has a new `uid`, it should return false. This bug was fixed 
by #4353. 

## Brief change log

Please check commit messages for change log

## Verifying this change

This PR adds `RestoreStreamTaskTest` and is not changing any productional 
code.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f7623

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4851.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4851


commit a99d384fcd5b021b71dc703d0e9e8063bd72f89e
Author: Piotr Nowojski 
Date:   2017-10-18T13:02:02Z

[hotfix][streaming] Fix formatting in OperatorChain

commit 5a2972d5f02521166c70ad68ad3fac0df9fad2e8
Author: Piotr Nowojski 
Date:   2017-10-18T14:01:38Z

[hotfix][tests] Add easier way to chain operator in StreamTaskTestHarness

commit 26cabd7762458633be891b746d06595a873033b4
Author: Piotr Nowojski 
Date:   2017-10-18T15:18:19Z

[hotfix][tests] Extract AcknowledgeStreamMockEnvironment

commit 2d6b45a55b8c63df4635a2d5506eab4b5ab590c3
Author: Piotr Nowojski 
Date:   2017-10-18T13:01:37Z

[FLINK-7623][tests] Add tests to make sure operator is never restored when 
using new operator id




> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-16 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16205634#comment-16205634
 ] 

Aljoscha Krettek commented on FLINK-7623:
-

Yes, it's a blocker.

> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-16 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16205605#comment-16205605
 ] 

Piotr Nowojski commented on FLINK-7623:
---

I will take a look at this in this or the following week. [~aljoscha] could you 
confirm that this is a release blocker?

> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-09-21 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174974#comment-16174974
 ] 

Aljoscha Krettek commented on FLINK-7623:
-

I think that would be incorrect, because then it would also report "not 
restored" in case the operator had no state assigned but (globally) was still 
restored, i.e. some other parallel instances of this operator might have state.

> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-09-18 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169987#comment-16169987
 ] 

Chesnay Schepler commented on FLINK-7623:
-

The culprit is this bit in the AbstractStreamOperator:
{code}
public final void initializeState(OperatorSubtaskState stateHandles) throws 
Exception {
...
boolean restoring = (null != stateHandles);
...
{code}

Instead this should be {{boolean restoring = stateHandles.hasState()}};

> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)