[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3770


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-28 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113922612
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Simple container class which contains the raw/managed/legacy operator 
state and key-group state handles for the sub
+ * tasks of an operator.
+ */
+public class OperatorState implements CompositeStateHandle {
+
+   private static final long serialVersionUID = -4845578005863201810L;
+
+   /** id of the operator */
+   private final OperatorID operatorID;
+
+   /** handles to non-partitioned states, subtaskindex -> subtaskstate */
+   private final Map subtaskStates;
--- End diff --

here and in a few other places in this class, we could add the `operator` 
String to   the variable names to make it clear for user that we are now 
dealing with state on the operator level and avoid confusing flink veterans 
that have a certain mental mapping for the word `(Sub)TaskState` that they must 
update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-28 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113928558
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -62,252 +69,390 @@ public StateAssignmentOperation(
}
 
public boolean assignStates() throws Exception {
-
-   // this tracks if we find missing node hash ids and already use 
secondary mappings
-   boolean expandedToLegacyIds = false;
-
+   Map localStates = new 
HashMap<>(taskStates);
Map localTasks = this.tasks;
 
-   for (Map.Entry taskGroupStateEntry : 
taskStates.entrySet()) {
-
-   TaskState taskState = taskGroupStateEntry.getValue();
-
-   //find vertex 
for state-
-
-   ExecutionJobVertex executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
-
-   // on the first time we can not find the execution job 
vertex for an id, we also consider alternative ids,
-   // for example as generated from older flink versions, 
to provide backwards compatibility.
-   if (executionJobVertex == null && !expandedToLegacyIds) 
{
-   localTasks = 
ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks);
-   executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
-   expandedToLegacyIds = true;
-   logger.info("Could not find ExecutionJobVertex. 
Including legacy JobVertexIDs in search.");
-   }
+   Set allOperatorIDs = new HashSet<>();
+   for (ExecutionJobVertex executionJobVertex : tasks.values()) {
+   
allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));
+   }
+   for (Map.Entry taskGroupStateEntry : 
taskStates.entrySet()) {
--- End diff --

Renaming `taskStates` and `taskGroupStateEntry` to something that has 
`operator` instead of `task` in it makes this more readable -  maybe 
`operatorToStateMapping`. Just some leftover from the refactoring i guess.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-28 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113929555
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -62,252 +69,390 @@ public StateAssignmentOperation(
}
 
public boolean assignStates() throws Exception {
-
-   // this tracks if we find missing node hash ids and already use 
secondary mappings
-   boolean expandedToLegacyIds = false;
-
+   Map localStates = new 
HashMap<>(taskStates);
Map localTasks = this.tasks;
 
-   for (Map.Entry taskGroupStateEntry : 
taskStates.entrySet()) {
-
-   TaskState taskState = taskGroupStateEntry.getValue();
-
-   //find vertex 
for state-
-
-   ExecutionJobVertex executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
-
-   // on the first time we can not find the execution job 
vertex for an id, we also consider alternative ids,
-   // for example as generated from older flink versions, 
to provide backwards compatibility.
-   if (executionJobVertex == null && !expandedToLegacyIds) 
{
-   localTasks = 
ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks);
-   executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
-   expandedToLegacyIds = true;
-   logger.info("Could not find ExecutionJobVertex. 
Including legacy JobVertexIDs in search.");
-   }
+   Set allOperatorIDs = new HashSet<>();
+   for (ExecutionJobVertex executionJobVertex : tasks.values()) {
+   
allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));
+   }
+   for (Map.Entry taskGroupStateEntry : 
taskStates.entrySet()) {
--- End diff --

This loop looks like we could factor it out into a private precondition 
method like `checkStateMappingCompleteness` or something like that. Even the 
previous loop and everything working on the hash set could go there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-28 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113928105
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -62,252 +69,390 @@ public StateAssignmentOperation(
}
 
public boolean assignStates() throws Exception {
-
-   // this tracks if we find missing node hash ids and already use 
secondary mappings
-   boolean expandedToLegacyIds = false;
-
+   Map localStates = new 
HashMap<>(taskStates);
Map localTasks = this.tasks;
 
-   for (Map.Entry taskGroupStateEntry : 
taskStates.entrySet()) {
-
-   TaskState taskState = taskGroupStateEntry.getValue();
-
-   //find vertex 
for state-
-
-   ExecutionJobVertex executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
-
-   // on the first time we can not find the execution job 
vertex for an id, we also consider alternative ids,
-   // for example as generated from older flink versions, 
to provide backwards compatibility.
-   if (executionJobVertex == null && !expandedToLegacyIds) 
{
-   localTasks = 
ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks);
-   executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
-   expandedToLegacyIds = true;
-   logger.info("Could not find ExecutionJobVertex. 
Including legacy JobVertexIDs in search.");
-   }
+   Set allOperatorIDs = new HashSet<>();
+   for (ExecutionJobVertex executionJobVertex : tasks.values()) {
+   
allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));
--- End diff --

I we change to immutable list instead of array, this code also saves one 
converting to list


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-28 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113920542
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1025,11 +1026,11 @@ public boolean restoreLatestCheckpointedState(
LOG.info("Restoring from latest valid checkpoint: {}.", 
latest);
 
// re-assign the task states
-
-   final Map taskStates = 
latest.getTaskStates();
+   final Map operatorStates = 
latest.getOperatorStates();
 
StateAssignmentOperation stateAssignmentOperation =
-   new StateAssignmentOperation(LOG, 
tasks, taskStates, allowNonRestoredState);
+   new StateAssignmentOperation(LOG, 
tasks, operatorStates, allowNonRestoredState);
--- End diff --

Not sure why this is implemented in a way that a logger is passed to the 
`StateAssignmentOperation`. I guess the class should simply have its own 
logger. I think this could be changed. But seems like this was introduced 
earlier and is unrelated to this PR. But I wouldn't to have this refactored to 
the normal logger scheme before we merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-28 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113924290
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -139,6 +158,10 @@ public ExecutionJobVertex(
this.serializedTaskInformation = null;
 
this.taskVertices = new ExecutionVertex[numTaskVertices];
+   List opIDs = jobVertex.getOperatorIDs();
+   this.operatorIDs = opIDs.toArray(new OperatorID[opIDs.size()]);
--- End diff --

How about making `operatorIDs` an immutable list instead of an array. I 
think all the operations you perform could also run on an array list and we 
could enforce immutability so that nobody is tempted to modify the inner state 
of the original array (e.g. to reverse the element order for convenience in 
other parts of the code). Same for the alternative Ids.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-28 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113919931
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -125,6 +132,8 @@ public JobVertex(String name) {
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
+   this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), 
this.id.getUpperPart()));
+   this.operatorIdsAlternatives.add(null);
--- End diff --

Yes, that is what I also expected. Just wanted to be really sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113903349
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -125,6 +132,8 @@ public JobVertex(String name) {
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
+   this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), 
this.id.getUpperPart()));
+   this.operatorIdsAlternatives.add(null);
--- End diff --

We only need a single alternative ID for each operatorID, the one set using 
`setUIDHash()`.

For savepoints created with 1.3 other alternatives aren't required since 
1.3 doesn't use the old hasher.

1.0-1.2 savepoints are converted to the 1.3 format using the alternative 
job vertex IDs (see `SavepointV2#convertToOperatorStateSavepointV2`), which is 
why we can't remove them. After the conversion however the are identical to a 
1.3 savepoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-28 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113898244
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -125,6 +132,8 @@ public JobVertex(String name) {
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
+   this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), 
this.id.getUpperPart()));
+   this.operatorIdsAlternatives.add(null);
--- End diff --

Ok, in this cases it really seems better to make this explicit as you 
suggested. Also I was wondering if `operatorIdsAlternatives` should be a 
List -- just want to make sure that only at most one 
alternative ID must be maintained per operator. But I think that we can always 
determine the savepoint version and only need compatibility to the hasher 
version that was valid under that savepoint version. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113482975
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) {
 * @param name The name of the new job vertex.
 * @param primaryId The id of the job vertex.
 * @param alternativeIds The alternative ids of the job vertex.
+* @param operatorIds The ids of all operators contained in this job 
vertex.
+* @param alternativeOperatorIds The alternative ids of all operators 
contained in this job vertex-
 */
-   public JobVertex(String name, JobVertexID primaryId, List 
alternativeIds) {
+   public JobVertex(String name, JobVertexID primaryId, List 
alternativeIds, List operatorIds, List 
alternativeOperatorIds) {
--- End diff --

We could add a simple OperatorIDs pojo that encapsulates both lists.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113482888
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) {
 * @param name The name of the new job vertex.
 * @param primaryId The id of the job vertex.
 * @param alternativeIds The alternative ids of the job vertex.
+* @param operatorIds The ids of all operators contained in this job 
vertex.
+* @param alternativeOperatorIds The alternative ids of all operators 
contained in this job vertex-
 */
-   public JobVertex(String name, JobVertexID primaryId, List 
alternativeIds) {
+   public JobVertex(String name, JobVertexID primaryId, List 
alternativeIds, List operatorIds, List 
alternativeOperatorIds) {
--- End diff --

We don't know at the time job JobVertex generation whether we need them, so 
we have to provide them eagerly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113482738
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -125,6 +132,8 @@ public JobVertex(String name) {
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
+   this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), 
this.id.getUpperPart()));
+   this.operatorIdsAlternatives.add(null);
--- End diff --

There's an implicit contract that the length or `operatorIDs` must be 
equivalent to `operatorIdsAlternatives`. We could store them as a pair instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-26 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113416026
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -133,11 +142,15 @@ public JobVertex(String name, JobVertexID id) {
 * @param name The name of the new job vertex.
 * @param primaryId The id of the job vertex.
 * @param alternativeIds The alternative ids of the job vertex.
+* @param operatorIds The ids of all operators contained in this job 
vertex.
+* @param alternativeOperatorIds The alternative ids of all operators 
contained in this job vertex-
 */
-   public JobVertex(String name, JobVertexID primaryId, List 
alternativeIds) {
+   public JobVertex(String name, JobVertexID primaryId, List 
alternativeIds, List operatorIds, List 
alternativeOperatorIds) {
--- End diff --

Again, generic Type on the third parameter seems off. I also suggest to 
introduce line breaks to the parameter list as it is very long. On top of that, 
we have a lot of parameter with the same type, which callers always can mix up 
easily. This and the number of arguments make me wonder if it would make sense 
to just have the actual Ids in the constructor, plus 2 methods to provide the 
alternative IDs for the cases that require them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-26 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113411211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java 
---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.util.AbstractID;
+
+import javax.xml.bind.DatatypeConverter;
+
+/**
+ * A class for statistically unique operator IDs.
+ */
+public class OperatorID extends AbstractID {
+
+   private static final long serialVersionUID = 1L;
+
+   public OperatorID() {
+   super();
+   }
--- End diff --

Code style: I would introduce an empty line in between and the call to 
super is ok, but also not required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-26 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113415296
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -125,6 +132,8 @@ public JobVertex(String name) {
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
+   this.operatorIDs.add(new OperatorID(this.id.getLowerPart(), 
this.id.getUpperPart()));
+   this.operatorIdsAlternatives.add(null);
--- End diff --

Why is it required to add `null`here, which seems strange? Either this is 
not required or indicates some implicit contracts about 
`operatorIdsAlternatives` that would at least justify a comment or (even 
better) a change. As far as I can see, it is just not required. Or do I miss 
something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-26 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113412507
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java 
---
@@ -40,6 +40,10 @@ public JobVertexID(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}
 
+   public JobVertexID(AbstractID id) {
--- End diff --

Different subclasses of AbstractID are intended to introduce some kind of 
type safety. With this in mind, I feel like this is a not very transparent way 
of "casting" between Ids. Maybe some `convert` methods could make this a bit 
more explicit than offering a public constructor for this purpose?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-26 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113416299
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
 ---
@@ -39,8 +39,8 @@ public InputFormatVertex(String name, JobVertexID id) {
super(name, id);
}
 
-   public InputFormatVertex(String name, JobVertexID id, List 
alternativeIds) {
-   super(name, id, alternativeIds);
+   public InputFormatVertex(String name, JobVertexID id, List 
alternativeIds, List operatorIds, List 
alternativeOperatorIds) {
--- End diff --

See my comments on a similar constructor in `JobVertex`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-26 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113411306
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java 
---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.util.AbstractID;
+
+import javax.xml.bind.DatatypeConverter;
+
+/**
+ * A class for statistically unique operator IDs.
+ */
+public class OperatorID extends AbstractID {
+
+   private static final long serialVersionUID = 1L;
+
+   public OperatorID() {
+   super();
+   }
+   public OperatorID(byte[] bytes) {
+   super(bytes);
+   }
+
+   public OperatorID(long lowerPart, long upperPart) {
+   super(lowerPart, upperPart);
+   }
+
+   public OperatorID(AbstractID id) {
+   super(id);
+   }
+
+   public static OperatorID fromHexString(String hexString) {
--- End diff --

If my IDE is telling the truth, this method is never used and could be 
removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-26 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3770#discussion_r113413839
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---
@@ -50,7 +50,14 @@
/** The ID of the vertex. */
private final JobVertexID id;
 
-   private final ArrayList idAlternatives = new ArrayList<>();
+   /** The alternative IDs of the vertex. */
+   private final ArrayList idAlternatives = new ArrayList<>();
--- End diff --

From the comments, this looks incorrect and I would expect a 
`List`here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-25 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3770

[FLINK-5892] Restore state on the operator level

## General
This PR is a collaboration between @guoweiM and myself, enabling Flink to 
restore state on the operator level. This means that the topology of a job may 
change in regards to chains when restoring from a 1.3 savepoint, allowing the 
arbitrary addition, removal or modification of chains.

The cornerstone for this is a semantic change for savepoints, no structural 
changes have been made to the `SavepointV0/1/2` classes or their serialized 
format:

In 1.2 a savepoint contains the states of tasks. If a task consists of 
multiple operators then the stored TaskState internally contains a list of 
states, one entry for each operator.

In 1.3 a savepoint contains the states of operators only; the notion of 
tasks is eliminated. If a task consists of multiple operators we store one 
TaskState for each operator instead. Internally they each contain a list of 
states with a length of 1.

## Implementation

In order for this to work a number of changes had to be made.

First and foremost we required a new `StateAssignmentOperation` that was 
aware of operators.
(74881a2, 8be9c58, 4fa8bbd)

Since the SAO uses the `ExecutionGraph` classes to map the restored state 
it was necessary to forward the IDs of all contained operators from the 
`StreamingJobGraphGenerator` to the `ExecutionJobVertex`.
(73427c3)

The `PendingCheckpoint` class had to be adjusted to conform to the new 
semantics; received `SubtaskStates`, containing the state of a task, are broken 
down into SubtaskStates for the individual operators.
(f7b8ef9)

## Tests

The majority of this PR are new tests (60% or so).

A number of tests were added under flink-tests that test the migration path 
from 1.2 to 1.3.
(d1efdb1)

These tests first restore a job from a 1.2 savepoint, without changes to 
the topology, verify that the state was restored correctly and finally create a 
new savepoint. They then restore from this migrated 1.3 savepoint, with changes 
to the topology for varying scenarios, and verify the correct restoration of 
state again.

A new test was also added to the `CheckpointCoordinatorTest` which tests 
the support for topology changes without executing a job.
(8b5430f9)

A number of existing tests had to be tweaked to run with the new changes, 
but these changes all boil down to extending existing mocks by a method or two.
(b5430f9)

## Other changes

To make it more obvious that we deal with operators and not tasks a new 
`OperatorID` class was introduced, and usages of `JobVertexID` in 
savepoint-related parts were replaced when appropriate.
(fe74023)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5982_operator_state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3770


commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f
Author: zentol 
Date:   2017-04-03T15:39:50Z

[prerequisite] Disable exception when assigning uid on chained operator

commit 74881a2788d034db67d99d6d32dbb2cf923aed53
Author: zentol 
Date:   2017-04-04T10:53:56Z

[internal] Adjust SavepointLoader to new Savepoint semantics

commit f7b8ef943097cd994a4ef3d5594fea4027720f5a
Author: zentol 
Date:   2017-04-04T13:02:55Z

[internal] adjust PendingCheckpoint to be in line with new semantics

commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac
Author: zentol 
Date:   2017-04-04T11:33:54Z

[internal] Get operator ID's into ExecutionGraph

commit 465805792932cb888393d9257fdefd828fa59343
Author: zentol 
Date:   2017-04-25T16:07:16Z

[internals] Extract several utility methods from StateAssignmentOperation

commit 008e848715b7091c3deabc9251d9d673f5506e64
Author: guowei.mgw 
Date:   2017-04-24T09:47:47Z

[internal] Add new StateAssignmentOperation

commit ffb93298ce90956b9886b3526258f6a814b7e0af
Author: zentol 
Date:   2017-04-04T13:01:07Z

[internal] Integrate new StateAssignmentOperation version

commit d1efdb1c34d59f04147292b320528cd2bc838244
Author: zentol 
Date:   2017-04-03T15:40:21Z

[tests] Add tests for chain modifications

commit 8b45b5a77f2cc499fdbb41d8198ac0a2e25bb1d7
Author: zentol 
Date:   2017-04-24T11:58:07Z

[tests] Adjust existing tests

commit b5430f98bfbb56e49f9a8b21fe5b1e5dd7358714
Author: guowei.mgw 
Date:   2017-04-24T10:13:44Z