[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-15 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/4353


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133027325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
--- End diff --

This class is totally intended to be immutable. So beyond what it is 
currently enforcing, do you suggest using immutable collections inside?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133026125
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
+
+   private static final long serialVersionUID = 1L;
+
+   /** Mapping from an operator id to the state of one subtask of this 
operator */
+   private final Map 
subtaskStatesByOperatorID;
--- End diff --

Hmm, I think if we consider default load factors and for large sizes, I 
would pick a min >30% hit rate linear array scan over 100% hit rate random 
access iteration. For all expected sizes (in cache) in this class, it should 
not matter. LHM also consumes a bit more memory. I would tend to keep it this 
way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133010887
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
--- End diff --

Would it make sense to make this immutable? It looks like this should not 
be modified any more after fully constructing it. This would also make it clear 
that methods iterating over the state, or returning sets / iterables can never 
fail with concurrent modifications.

For example the `size` method is considered a "best effort" method for info 
purposes only, and should not fail with an exception (it currently could fail 
with a `ConcurrentModificationException`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133021771
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -878,14 +873,17 @@ public void 
testSuccessfulCheckpointSubsumesUnsuccessful() {
}
long checkpointId2 = pending2.getCheckpointId();
 
-   Map operatorStates2 = 
pending2.getOperatorStates();
+   TaskStateSnapshot taskOperatorSubtaskStates2_1 = 
spy(new TaskStateSnapshot());
--- End diff --

Same as above, spying necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133013315
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 ---
@@ -164,8 +168,16 @@ public void acknowledgeCheckpoint(
throw new RuntimeException(e);
}
 
+   boolean hasManagedKeyedState = false;
+   for (Map.Entry entry : checkpointStateHandles.getSubtaskStateMappings()) 
{
+   OperatorSubtaskState state = 
entry.getValue();
+   if (state != null) {
+   hasManagedKeyedState |= 
state.getManagedKeyedState() != null;
+   }
+   }
+
// should be one k/v state
--- End diff --

"should be **at least** one k/v state"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133021720
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -850,18 +843,20 @@ public void 
testSuccessfulCheckpointSubsumesUnsuccessful() {
OperatorID opID2 = 
OperatorID.fromJobVertexID(ackVertex2.getJobvertexId());
OperatorID opID3 = 
OperatorID.fromJobVertexID(ackVertex3.getJobvertexId());
 
-   Map operatorStates1 = 
pending1.getOperatorStates();
+   TaskStateSnapshot taskOperatorSubtaskStates1_1 = 
spy(new TaskStateSnapshot());
--- End diff --

Is spying necessary here? There seem to be no `verify()` calls on this 
type...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133009796
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
+
+   private static final long serialVersionUID = 1L;
+
+   /** Mapping from an operator id to the state of one subtask of this 
operator */
+   private final Map 
subtaskStatesByOperatorID;
--- End diff --

A `LinkedHashMap` has a slightly more predictable iteration performance 
(list traversal) compared to a `HashMap` (search through sparse table array). 
There are a lot of value iterations done in this class, but we also should have 
pretty full hash tables (since we never delete), so not sure how much 
difference it makes...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133018189
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -185,44 +184,66 @@ private void assignAttemptState(ExecutionJobVertex 
executionJobVertex, List

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133016663
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 ---
@@ -75,31 +103,79 @@
 */
private final long stateSize;
 
+   @VisibleForTesting
+   public OperatorSubtaskState(StreamStateHandle legacyOperatorState) {
+
+   this(legacyOperatorState,
+   Collections.emptyList(),
+   Collections.emptyList(),
+   Collections.emptyList(),
+   Collections.emptyList());
+   }
+
+   /**
+* Empty state.
+*/
+   public OperatorSubtaskState() {
--- End diff --

Minor optimization: One could make this constructor `private` and have a 
field `OperatorSubtaskState.EMPTY` as a placeholder for the empty states. I'd 
leave this to you whether you think it worth doing...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-08-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r133022095
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -553,31 +551,29 @@ public void testTriggerAndConfirmSimpleCheckpoint() {
assertFalse(checkpoint.isDiscarded());
assertFalse(checkpoint.isFullyAcknowledged());
 
-   OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
-   OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
-
-   Map operatorStates = 
checkpoint.getOperatorStates();
-
-   operatorStates.put(opID1, new SpyInjectingOperatorState(
-   opID1, 
vertex1.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism()));
-   operatorStates.put(opID2, new SpyInjectingOperatorState(
-   opID2, 
vertex2.getTotalNumberOfParallelSubtasks(), vertex2.getMaxParallelism()));
-
// check that the vertices received the trigger 
checkpoint message
{
verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
}
 
+   OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+   OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+   TaskStateSnapshot taskOperatorSubtaskStates1 = 
mock(TaskStateSnapshot.class);
--- End diff --

Why not create a proper `TaskStateSnapshot` with one entry, rather than 
mocking?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-31 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r130295736
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -208,13 +208,13 @@ public MetricGroup getMetricGroup() {
}
 
@Override
-   public final void initializeState(OperatorStateHandles stateHandles) 
throws Exception {
+   public final void initializeState(OperatorSubtaskState stateHandles) 
throws Exception {
 
Collection keyedStateHandlesRaw = null;
Collection operatorStateHandlesRaw = null;
Collection operatorStateHandlesBackend = 
null;
 
-   boolean restoring = null != stateHandles;
+   boolean restoring = (null != stateHandles);
--- End diff --

+1 to keep the parenthesis

I think we should let contributors use such styles at their discretion


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-26 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129532719
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -208,13 +208,13 @@ public MetricGroup getMetricGroup() {
}
 
@Override
-   public final void initializeState(OperatorStateHandles stateHandles) 
throws Exception {
+   public final void initializeState(OperatorSubtaskState stateHandles) 
throws Exception {
 
Collection keyedStateHandlesRaw = null;
Collection operatorStateHandlesRaw = null;
Collection operatorStateHandlesBackend = 
null;
 
-   boolean restoring = null != stateHandles;
+   boolean restoring = (null != stateHandles);
--- End diff --

I like to do this when generating a boolean out of a `!=` or `==` 
comparison because I find this easier to read in the presence of more than one 
`=` character.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129272281
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by t
--- End diff --

add empty line before paragraph


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129277176
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -867,81 +845,60 @@ public String toString() {
 
AsyncCheckpointRunnable(
StreamTask owner,
-   List 
nonPartitionedStateHandles,
-   List 
snapshotInProgressList,
+   Map 
nonPartitionedStateHandles,
+   Map 
operatorSnapshotsInProgress,
CheckpointMetaData checkpointMetaData,
CheckpointMetrics checkpointMetrics,
long asyncStartNanos) {
 
this.owner = Preconditions.checkNotNull(owner);
-   this.snapshotInProgressList = 
Preconditions.checkNotNull(snapshotInProgressList);
+   this.operatorSnapshotsInProgress = 
Preconditions.checkNotNull(operatorSnapshotsInProgress);
this.checkpointMetaData = 
Preconditions.checkNotNull(checkpointMetaData);
this.checkpointMetrics = 
Preconditions.checkNotNull(checkpointMetrics);
this.nonPartitionedStateHandles = 
nonPartitionedStateHandles;
this.asyncStartNanos = asyncStartNanos;
-
-   if (!snapshotInProgressList.isEmpty()) {
-   // TODO Currently only the head operator of a 
chain can have keyed state, so simply access it directly.
-   int headIndex = snapshotInProgressList.size() - 
1;
-   OperatorSnapshotResult snapshotInProgress = 
snapshotInProgressList.get(headIndex);
-   if (null != snapshotInProgress) {
-   this.futureKeyedBackendStateHandles = 
snapshotInProgress.getKeyedStateManagedFuture();
-   this.futureKeyedStreamStateHandles = 
snapshotInProgress.getKeyedStateRawFuture();
-   }
-   }
}
 
@Override
public void run() {
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
-   // Keyed state handle future, currently only 
one (the head) operator can have this
-   KeyedStateHandle keyedStateHandleBackend = 
FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
-   KeyedStateHandle keyedStateHandleStream = 
FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
-
-   List operatorStatesBackend 
= new ArrayList<>(snapshotInProgressList.size());
-   List operatorStatesStream 
= new ArrayList<>(snapshotInProgressList.size());
-
-   for (OperatorSnapshotResult snapshotInProgress 
: snapshotInProgressList) {
-   if (null != snapshotInProgress) {
-   operatorStatesBackend.add(
-   
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
-   operatorStatesStream.add(
-   
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
-   } else {
-   operatorStatesBackend.add(null);
-   operatorStatesStream.add(null);
-   }
-   }
+   boolean hasState = false;
+   final TaskStateSnapshot 
taskOperatorSubtaskStates =
+   new 
TaskStateSnapshot(operatorSnapshotsInProgress.size());
 
-   final long asyncEndNanos = System.nanoTime();
-   final long asyncDurationMillis = (asyncEndNanos 
- asyncStartNanos) / 1_000_000;
+   for (Map.Entry entry : operatorSnapshotsInProgress.entrySet()) {
 
-   
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
+   OperatorID operatorID = entry.getKey();
+   OperatorSnapshotResult 
snapshotInProgress = entry.getValue();
 
-   ChainedStateHandle 

[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129272254
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is 
a physical execution responsible for
+ * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ * One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by t
+ * he checkpoint coordinator. Tasks run operator instances in parallel, so 
the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ * This class should be called TaskState once the old class with this 
name that we keep for backwards
--- End diff --

add empty line before paragraph


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129275823
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -208,13 +208,13 @@ public MetricGroup getMetricGroup() {
}
 
@Override
-   public final void initializeState(OperatorStateHandles stateHandles) 
throws Exception {
+   public final void initializeState(OperatorSubtaskState stateHandles) 
throws Exception {
 
Collection keyedStateHandlesRaw = null;
Collection operatorStateHandlesRaw = null;
Collection operatorStateHandlesBackend = 
null;
 
-   boolean restoring = null != stateHandles;
+   boolean restoring = (null != stateHandles);
--- End diff --

why did you add the braces?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129278519
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 ---
@@ -18,20 +18,43 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 
 /**
- * Container for the state of one parallel subtask of an operator. This is 
part of the {@link OperatorState}.
+ * This class encapsulates the state for one parallel instance of an 
operator. The complete state of a (logical)
+ * operator (e.g. a flatmap operator) consists of the union of all {@link 
OperatorSubtaskState}s from all
+ * parallel tasks that physically execute parallelized, physical instances 
of the operator.
+ *
+ * The full state of the logical operator is represented by {@link 
OperatorState} which consists of
+ * {@link OperatorSubtaskState}s.
+ *
+ * Typically, we expect all collections in this class to be of size 0 
or 1, because there up to one state handle
--- End diff --

because there **is**


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129274874
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 ---
@@ -118,10 +118,22 @@ public void testSetState() {
PendingCheckpoint pending = 
coord.getPendingCheckpoints().values().iterator().next();
final long checkpointId = pending.getCheckpointId();
 
-   SubtaskState checkpointStateHandles = new 
SubtaskState(serializedState, null, null, serializedKeyGroupStates, null);
-   coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new 
CheckpointMetrics(), checkpointStateHandles));
-   coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new 
CheckpointMetrics(), checkpointStateHandles));
-   coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, new 
CheckpointMetrics(), checkpointStateHandles));
+   final TaskStateSnapshot subtaskStates = new 
TaskStateSnapshot();
+
+   subtaskStates.putSubtaskStateByOperatorID(
+   OperatorID.fromJobVertexID(statefulId),
+   new OperatorSubtaskState(
+   serializedState.get(0),
+   
Collections.emptyList(),
+   
Collections.emptyList(),
+   
Collections.singletonList(serializedKeyGroupStates),
+   
Collections.emptyList()));
+
+   //SubtaskState checkpointStateHandles = new 
SubtaskState(serializedState, null, null, serializedKeyGroupStates, null);
--- End diff --

whats up with this line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129269706
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
 ---
@@ -89,6 +89,10 @@ private GroupByStateNameResults groupByStateName(
 
for (OperatorStateHandle psh : previousParallelSubtaskStates) {
 
+   if(psh == null) {
--- End diff --

missing spacer after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129274620
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -3631,16 +3673,16 @@ public void 
testSavepointsAreNotAddedToCompletedCheckpointStore() throws Excepti

completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == 
checkpointIDCounter.getLast());
}
 
-   private static final class SpyInjectingOperatorState extends 
OperatorState {
-
-   private static final long serialVersionUID = 
-4004437428483663815L;
-
-   public SpyInjectingOperatorState(OperatorID taskID, int 
parallelism, int maxParallelism) {
-   super(taskID, parallelism, maxParallelism);
-   }
-
-   public void putState(int subtaskIndex, OperatorSubtaskState 
subtaskState) {
-   super.putState(subtaskIndex, spy(subtaskState));
-   }
-   }
+// private static final class SpyInjectingOperatorState extends 
OperatorState {
--- End diff --

you can remove this (which is _really_ great...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129278337
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 ---
@@ -75,31 +103,84 @@
 */
private final long stateSize;
 
+   @VisibleForTesting
+   public OperatorSubtaskState(StreamStateHandle legacyOperatorState) {
--- End diff --

should this constructor call the other one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129266693
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 ---
@@ -18,20 +18,40 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 
 /**
- * Container for the state of one parallel subtask of an operator. This is 
part of the {@link OperatorState}.
+ * This class encapsulates the state for one parallel instance of an 
operator. The complete state of a (logical)
+ * operator (e.g. a flatmap operator) consists of the union of all {@link 
OperatorSubtaskState}s from all
+ * parallel tasks that physically execute parallelized, physical instances 
of the operator.
+ * The full state of the logical operator is represented by {@link 
OperatorState} which consists of
+ * {@link OperatorSubtaskState}s.
+ * Typically, we expect all collections in this class to be of size 0 
or 1, because there up to one state handle
+ * produced per state type (e.g. managed-keyed, raw-operator, ...). In 
particular, this holds when taking a snapshot.
+ * The purpose of having the state handles in collections is that this 
class is also reused in restoring state.
+ * Under normal circumstances, the expected size of each collection is 
still 0 or 1, except for scale-down. In
--- End diff --

In the master, we used two different classes for this purpose: 
`OperatorSubtaskState` to report from task to master, and `TaskStateHandles` to 
restore from master to task. Their difference is that in the first all fields 
are singletons, and the second all are collections. Otherwise, their purpose is 
identical, so I collapsed them into one class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129019373
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 ---
@@ -164,8 +168,16 @@ public void acknowledgeCheckpoint(
throw new RuntimeException(e);
}
 
+   boolean hasKeyedManagedKeyedState = false;
--- End diff --

-> `hasManagedKeyedState`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129020085
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 ---
@@ -164,6 +269,7 @@ public long getStateSize() {
 
// 

 
+
--- End diff --

remove this empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r129020863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 ---
@@ -18,20 +18,40 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 
 /**
- * Container for the state of one parallel subtask of an operator. This is 
part of the {@link OperatorState}.
+ * This class encapsulates the state for one parallel instance of an 
operator. The complete state of a (logical)
+ * operator (e.g. a flatmap operator) consists of the union of all {@link 
OperatorSubtaskState}s from all
+ * parallel tasks that physically execute parallelized, physical instances 
of the operator.
+ * The full state of the logical operator is represented by {@link 
OperatorState} which consists of
+ * {@link OperatorSubtaskState}s.
+ * Typically, we expect all collections in this class to be of size 0 
or 1, because there up to one state handle
+ * produced per state type (e.g. managed-keyed, raw-operator, ...). In 
particular, this holds when taking a snapshot.
+ * The purpose of having the state handles in collections is that this 
class is also reused in restoring state.
+ * Under normal circumstances, the expected size of each collection is 
still 0 or 1, except for scale-down. In
--- End diff --

How come we don't need this in the current master, where this class is also 
used for restoring state?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r127709120
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 ---
@@ -18,20 +18,40 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 
 /**
- * Container for the state of one parallel subtask of an operator. This is 
part of the {@link OperatorState}.
+ * This class encapsulates the state for one parallel instance of an 
operator. The complete state of a (logical)
+ * operator (e.g. a flatmap operator) consists of the union of all {@link 
OperatorSubtaskState}s from all
+ * parallel tasks that physically execute parallelized, physical instances 
of the operator.
+ * The full state of the logical operator is represented by {@link 
OperatorState} which consists of
--- End diff --

please add en empty line before the `` tag so we have to make less 
changes when activating checkstyle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4353#discussion_r127708906
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 ---
@@ -18,20 +18,40 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 
 /**
- * Container for the state of one parallel subtask of an operator. This is 
part of the {@link OperatorState}.
+ * This class encapsulates the state for one parallel instance of an 
operator. The complete state of a (logical)
+ * operator (e.g. a flatmap operator) consists of the union of all {@link 
OperatorSubtaskState}s from all
+ * parallel tasks that physically execute parallelized, physical instances 
of the operator.
+ * The full state of the logical operator is represented by {@link 
OperatorState} which consists of
+ * {@link OperatorSubtaskState}s.
+ * Typically, we expect all collections in this class to be of size 0 
or 1, because there up to one state handle
+ * produced per state type (e.g. managed-keyed, raw-operator, ...). In 
particular, this holds when taking a snapshot.
+ * The purpose of having the state handles in collections is that this 
class is also reused in restoring state.
+ * Under normal circumstances, the expected size of each collection is 
still 0 or 1, except for scale-down. In
+ * scale-down, one operator subtask can become responsible for the state 
of multiple previous subtasks. The collections
+ * can then store all the state handles that are relevant to build up the 
new subtask state.
+ * There is no collection for legacy state because it is nor rescalable.
--- End diff --

typo: nor -> not


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...

2017-07-17 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/4353

[FLINK-7213] Introduce state management by OperatorID in TaskManager

Flink-5892 introduced the job manager / checkpoint coordinator part of 
managing state on the operator level instead of the task level by introducing 
explicit operator_id -> state mappings. 

However, this explicit mapping was not introduced in the task manager side, 
so the explicit mapping is still converted into a mapping that suits the 
implicit operator chain order.

This PR introduces this part and offers explicit state management by 
operator_id in the task manager.

Furthermore, this PR also introduces `TaskStateSnapshot` as unify 
abstraction to replace `TaskStateHandles` and `SubtaskState`which were always 
very similar, except that one offered collections of state handles (to support 
scaling in on restore) while the other only contained single objects (because 
each state is snapshotted into one state handle).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink tmpBU

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4353.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4353


commit d68dd39b343595120f62fc8280b2f3c5f0ee7503
Author: Stefan Richter 
Date:   2017-06-26T16:07:59Z

[FLINK-7213] Introduce state management by OperatorID in TaskManager

commit 11cdd85668aa18f8e5bab0e6cac9ba082bfea95c
Author: Stefan Richter 
Date:   2017-07-11T15:10:03Z

[FLINK-7213] Introduce TaskStateSnapshot to unify TaskStateHandles and 
SubtaskState




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---