http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
new file mode 100644
index 0000000..9faf3fb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Common interface for the runtime {@link ExecutionVertex} and {@link 
ArchivedExecutionVertex}.
+ */
+public interface AccessExecutionVertex {
+       /**
+        * Returns the name of this execution vertex in the format "myTask 
(2/7)".
+        *
+        * @return name of this execution vertex
+        */
+       String getTaskNameWithSubtaskIndex();
+
+       /**
+        * Returns the subtask index of this execution vertex.
+        *
+        * @return subtask index of this execution vertex.
+        */
+       int getParallelSubtaskIndex();
+
+       /**
+        * Returns the current execution for this execution vertex.
+        *
+        * @return current execution
+        */
+       AccessExecution getCurrentExecutionAttempt();
+
+       /**
+        * Returns the current {@link ExecutionState} for this execution vertex.
+        *
+        * @return execution state for this execution vertex
+        */
+       ExecutionState getExecutionState();
+
+       /**
+        * Returns the timestamp for the given {@link ExecutionState}.
+        *
+        * @param state state for which the timestamp should be returned
+        * @return timestamp for the given state
+        */
+       long getStateTimestamp(ExecutionState state);
+
+       /**
+        * Returns the exception that caused the job to fail. This is the first 
root exception
+        * that was not recoverable and triggered job failure.
+        *
+        * @return failure exception as a string, or {@code "(null)"}
+        */
+       String getFailureCauseAsString();
+
+       /**
+        * Returns the {@link TaskManagerLocation} for this execution vertex.
+        *
+        * @return taskmanager location for this execution vertex.
+        */
+       TaskManagerLocation getCurrentAssignedResourceLocation();
+
+       /**
+        * Returns the execution for the given attempt number.
+        *
+        * @param attemptNumber attempt number of execution to be returned
+        * @return execution for the given attempt number
+        */
+       AccessExecution getPriorExecutionAttempt(int attemptNumber);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
new file mode 100644
index 0000000..0b2992f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class ArchivedExecution implements AccessExecution, Serializable {
+       private static final long serialVersionUID = 4817108757483345173L;
+       // 
--------------------------------------------------------------------------------------------
+
+       private final ExecutionAttemptID attemptId;
+
+       private final long[] stateTimestamps;
+
+       private final int attemptNumber;
+
+       private final ExecutionState state;
+
+       private final String failureCause;          // once assigned, never 
changes
+
+       private final TaskManagerLocation assignedResourceLocation; // for the 
archived execution
+
+       /* Continuously updated map of user-defined accumulators */
+       private final StringifiedAccumulatorResult[] userAccumulators;
+
+       /* Continuously updated map of internal accumulators */
+       private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
flinkAccumulators;
+       private final int parallelSubtaskIndex;
+
+       public ArchivedExecution(Execution execution) {
+               this.userAccumulators = 
execution.getUserAccumulatorsStringified();
+               this.flinkAccumulators = execution.getFlinkAccumulators();
+               this.attemptId = execution.getAttemptId();
+               this.attemptNumber = execution.getAttemptNumber();
+               this.stateTimestamps = execution.getStateTimestamps();
+               this.parallelSubtaskIndex = 
execution.getVertex().getParallelSubtaskIndex();
+               this.state = execution.getState();
+               this.failureCause = 
ExceptionUtils.stringifyException(execution.getFailureCause());
+               this.assignedResourceLocation = 
execution.getAssignedResourceLocation();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //   Accessors
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public ExecutionAttemptID getAttemptId() {
+               return attemptId;
+       }
+
+       @Override
+       public int getAttemptNumber() {
+               return attemptNumber;
+       }
+
+       @Override
+       public long[] getStateTimestamps() {
+               return stateTimestamps;
+       }
+
+       @Override
+       public ExecutionState getState() {
+               return state;
+       }
+
+       @Override
+       public TaskManagerLocation getAssignedResourceLocation() {
+               return assignedResourceLocation;
+       }
+
+       @Override
+       public String getFailureCauseAsString() {
+               return failureCause;
+       }
+
+       @Override
+       public long getStateTimestamp(ExecutionState state) {
+               return this.stateTimestamps[state.ordinal()];
+       }
+
+       @Override
+       public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
+               return userAccumulators;
+       }
+
+       @Override
+       public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getFlinkAccumulators() {
+               return flinkAccumulators;
+       }
+
+       @Override
+       public int getParallelSubtaskIndex() {
+               return parallelSubtaskIndex;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
new file mode 100644
index 0000000..493825a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+public class ArchivedExecutionGraph implements AccessExecutionGraph, 
Serializable {
+       private static final long serialVersionUID = 7231383912742578428L;
+       // 
--------------------------------------------------------------------------------------------
+
+       /** The ID of the job this graph has been built for. */
+       private final JobID jobID;
+
+       /** The name of the original job graph. */
+       private final String jobName;
+
+       /** All job vertices that are part of this graph */
+       private final Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
+
+       /** All vertices, in the order in which they were created **/
+       private final List<ArchivedExecutionJobVertex> verticesInCreationOrder;
+
+       /**
+        * Timestamps (in milliseconds as returned by {@code 
System.currentTimeMillis()} when
+        * the execution graph transitioned into a certain state. The index 
into this array is the
+        * ordinal of the enum value, i.e. the timestamp when the graph went 
into state "RUNNING" is
+        * at {@code stateTimestamps[RUNNING.ordinal()]}.
+        */
+       private final long[] stateTimestamps;
+
+       // ------ Configuration of the Execution -------
+
+       // ------ Execution status and progress. These values are volatile, and 
accessed under the lock -------
+
+       /** Current status of the job execution */
+       private final JobStatus state;
+
+       /**
+        * The exception that caused the job to fail. This is set to the first 
root exception
+        * that was not recoverable and triggered job failure
+        */
+       private final String failureCause;
+
+       // ------ Fields that are only relevant for archived execution graphs 
------------
+       private final String jsonPlan;
+       private final StringifiedAccumulatorResult[] archivedUserAccumulators;
+       private final ArchivedExecutionConfig archivedExecutionConfig;
+       private final boolean isStoppable;
+       private final Map<String, SerializedValue<Object>> 
serializedUserAccumulators;
+       private final ArchivedCheckpointStatsTracker tracker;
+
+       public ArchivedExecutionGraph(
+               JobID jobID,
+               String jobName,
+               Map<JobVertexID, ArchivedExecutionJobVertex> tasks,
+               List<ArchivedExecutionJobVertex> verticesInCreationOrder,
+               long[] stateTimestamps,
+               JobStatus state,
+               String failureCause,
+               String jsonPlan,
+               StringifiedAccumulatorResult[] archivedUserAccumulators,
+               Map<String, SerializedValue<Object>> serializedUserAccumulators,
+               ArchivedExecutionConfig executionConfig,
+               boolean isStoppable,
+               ArchivedCheckpointStatsTracker tracker
+       ) {
+               this.jobID = jobID;
+               this.jobName = jobName;
+               this.tasks = tasks;
+               this.verticesInCreationOrder = verticesInCreationOrder;
+               this.stateTimestamps = stateTimestamps;
+               this.state = state;
+               this.failureCause = failureCause;
+               this.jsonPlan = jsonPlan;
+               this.archivedUserAccumulators = archivedUserAccumulators;
+               this.serializedUserAccumulators = serializedUserAccumulators;
+               this.archivedExecutionConfig = executionConfig;
+               this.isStoppable = isStoppable;
+               this.tracker = tracker;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       @Override
+       public String getJsonPlan() {
+               return jsonPlan;
+       }
+
+       @Override
+       public JobID getJobID() {
+               return jobID;
+       }
+
+       @Override
+       public String getJobName() {
+               return jobName;
+       }
+
+       @Override
+       public JobStatus getState() {
+               return state;
+       }
+
+       @Override
+       public String getFailureCauseAsString() {
+               return failureCause;
+       }
+
+       @Override
+       public ArchivedExecutionJobVertex getJobVertex(JobVertexID id) {
+               return this.tasks.get(id);
+       }
+
+       @Override
+       public Map<JobVertexID, AccessExecutionJobVertex> getAllVertices() {
+               return Collections.<JobVertexID, 
AccessExecutionJobVertex>unmodifiableMap(this.tasks);
+       }
+
+       @Override
+       public Iterable<ArchivedExecutionJobVertex> getVerticesTopologically() {
+               // we return a specific iterator that does not fail with 
concurrent modifications
+               // the list is append only, so it is safe for that
+               final int numElements = this.verticesInCreationOrder.size();
+
+               return new Iterable<ArchivedExecutionJobVertex>() {
+                       @Override
+                       public Iterator<ArchivedExecutionJobVertex> iterator() {
+                               return new 
Iterator<ArchivedExecutionJobVertex>() {
+                                       private int pos = 0;
+
+                                       @Override
+                                       public boolean hasNext() {
+                                               return pos < numElements;
+                                       }
+
+                                       @Override
+                                       public ArchivedExecutionJobVertex 
next() {
+                                               if (hasNext()) {
+                                                       return 
verticesInCreationOrder.get(pos++);
+                                               } else {
+                                                       throw new 
NoSuchElementException();
+                                               }
+                                       }
+
+                                       @Override
+                                       public void remove() {
+                                               throw new 
UnsupportedOperationException();
+                                       }
+                               };
+                       }
+               };
+       }
+
+       @Override
+       public Iterable<ArchivedExecutionVertex> getAllExecutionVertices() {
+               return new Iterable<ArchivedExecutionVertex>() {
+                       @Override
+                       public Iterator<ArchivedExecutionVertex> iterator() {
+                               return new 
AllVerticesIterator(getVerticesTopologically().iterator());
+                       }
+               };
+       }
+
+       @Override
+       public long getStatusTimestamp(JobStatus status) {
+               return this.stateTimestamps[status.ordinal()];
+       }
+
+       @Override
+       public CheckpointStatsTracker getCheckpointStatsTracker() {
+               return tracker;
+       }
+
+       /**
+        * Gets the internal flink accumulator map of maps which contains some 
metrics.
+        *
+        * @return A map of accumulators for every executed task.
+        */
+       @Override
+       public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>>> getFlinkAccumulators() {
+               Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>>> flinkAccumulators =
+                       new HashMap<>();
+
+               for (AccessExecutionVertex vertex : getAllExecutionVertices()) {
+                       Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
+                       
flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), 
taskAccs);
+               }
+
+               return flinkAccumulators;
+       }
+
+       @Override
+       public boolean isArchived() {
+               return true;
+       }
+
+       public StringifiedAccumulatorResult[] getUserAccumulators() {
+               return archivedUserAccumulators;
+       }
+
+       public ArchivedExecutionConfig getArchivedExecutionConfig() {
+               return archivedExecutionConfig;
+       }
+
+       @Override
+       public boolean isStoppable() {
+               return isStoppable;
+       }
+
+       @Override
+       public StringifiedAccumulatorResult[] 
getAccumulatorResultsStringified() {
+               return archivedUserAccumulators;
+       }
+
+       @Override
+       public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() 
{
+               return serializedUserAccumulators;
+       }
+
+       class AllVerticesIterator implements Iterator<ArchivedExecutionVertex> {
+
+               private final Iterator<ArchivedExecutionJobVertex> jobVertices;
+
+               private ArchivedExecutionVertex[] currVertices;
+
+               private int currPos;
+
+
+               public AllVerticesIterator(Iterator<ArchivedExecutionJobVertex> 
jobVertices) {
+                       this.jobVertices = jobVertices;
+               }
+
+
+               @Override
+               public boolean hasNext() {
+                       while (true) {
+                               if (currVertices != null) {
+                                       if (currPos < currVertices.length) {
+                                               return true;
+                                       } else {
+                                               currVertices = null;
+                                       }
+                               } else if (jobVertices.hasNext()) {
+                                       currVertices = 
jobVertices.next().getTaskVertices();
+                                       currPos = 0;
+                               } else {
+                                       return false;
+                               }
+                       }
+               }
+
+               @Override
+               public ArchivedExecutionVertex next() {
+                       if (hasNext()) {
+                               return currVertices[currPos++];
+                       } else {
+                               throw new NoSuchElementException();
+                       }
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
new file mode 100644
index 0000000..4857bf5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import scala.Option;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregateJobVertexState;
+
+public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, 
Serializable {
+
+       private static final long serialVersionUID = -5768187638639437957L;
+       private final ArchivedExecutionVertex[] taskVertices;
+
+       private final JobVertexID id;
+
+       private final String name;
+
+       private final int parallelism;
+
+       private final int maxParallelism;
+
+       private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
aggregatedMetricAccumulators;
+       private final Option<OperatorCheckpointStats> checkpointStats;
+       private final StringifiedAccumulatorResult[] archivedUserAccumulators;
+
+       public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) {
+               this.taskVertices = new 
ArchivedExecutionVertex[jobVertex.getTaskVertices().length];
+               for (int x = 0; x < taskVertices.length; x++) {
+                       taskVertices[x] = 
jobVertex.getTaskVertices()[x].archive();
+               }
+
+               aggregatedMetricAccumulators = 
jobVertex.getAggregatedMetricAccumulators();
+
+               Map<String, Accumulator<?, ?>> tmpArchivedUserAccumulators = 
new HashMap<>();
+               for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+                       Map<String, Accumulator<?, ?>> next = 
vertex.getCurrentExecutionAttempt().getUserAccumulators();
+                       if (next != null) {
+                               
AccumulatorHelper.mergeInto(tmpArchivedUserAccumulators, next);
+                       }
+               }
+               archivedUserAccumulators = 
jobVertex.getAggregatedUserAccumulatorsStringified();
+
+               this.id = jobVertex.getJobVertexId();
+               this.name = jobVertex.getJobVertex().getName();
+               this.parallelism = jobVertex.getParallelism();
+               this.maxParallelism = jobVertex.getMaxParallelism();
+               CheckpointStatsTracker tracker = 
jobVertex.getGraph().getCheckpointStatsTracker();
+               checkpointStats = tracker != null
+                       ? tracker.getOperatorStats(this.id)
+                       : Option.<OperatorCheckpointStats>empty();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //   Accessors
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public String getName() {
+               return name;
+       }
+
+       @Override
+       public int getParallelism() {
+               return parallelism;
+       }
+
+       @Override
+       public int getMaxParallelism() {
+               return maxParallelism;
+       }
+
+       @Override
+       public JobVertexID getJobVertexId() {
+               return id;
+       }
+
+       @Override
+       public ArchivedExecutionVertex[] getTaskVertices() {
+               return taskVertices;
+       }
+
+       @Override
+       public ExecutionState getAggregateState() {
+               int[] num = new int[ExecutionState.values().length];
+               for (ArchivedExecutionVertex vertex : this.taskVertices) {
+                       num[vertex.getExecutionState().ordinal()]++;
+               }
+
+               return getAggregateJobVertexState(num, parallelism);
+       }
+
+       public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getAggregatedMetricAccumulators() {
+               return this.aggregatedMetricAccumulators;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Static / pre-assigned input splits
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public Option<OperatorCheckpointStats> getCheckpointStats() {
+               return checkpointStats;
+       }
+
+       @Override
+       public StringifiedAccumulatorResult[] 
getAggregatedUserAccumulatorsStringified() {
+               return archivedUserAccumulators;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
new file mode 100644
index 0000000..e1fb11a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ArchivedExecutionVertex implements AccessExecutionVertex, 
Serializable {
+
+       private static final long serialVersionUID = -6708241535015028576L;
+       private final int subTaskIndex;
+
+       private final List<ArchivedExecution> priorExecutions;
+
+       /** The name in the format "myTask (2/7)", cached to avoid frequent 
string concatenations */
+       private final String taskNameWithSubtask;
+
+       private final ArchivedExecution currentExecution;    // this field must 
never be null
+
+       public ArchivedExecutionVertex(ExecutionVertex vertex) {
+               this.subTaskIndex = vertex.getParallelSubtaskIndex();
+               this.priorExecutions = new ArrayList<>();
+               for (Execution priorExecution : vertex.getPriorExecutions()) {
+                       priorExecutions.add(priorExecution.archive());
+               }
+               this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
+               this.currentExecution = 
vertex.getCurrentExecutionAttempt().archive();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //   Accessors
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public String getTaskNameWithSubtaskIndex() {
+               return this.taskNameWithSubtask;
+       }
+
+       @Override
+       public int getParallelSubtaskIndex() {
+               return this.subTaskIndex;
+       }
+
+       @Override
+       public ArchivedExecution getCurrentExecutionAttempt() {
+               return currentExecution;
+       }
+
+       @Override
+       public ExecutionState getExecutionState() {
+               return currentExecution.getState();
+       }
+
+       @Override
+       public long getStateTimestamp(ExecutionState state) {
+               return currentExecution.getStateTimestamp(state);
+       }
+
+       @Override
+       public String getFailureCauseAsString() {
+               return currentExecution.getFailureCauseAsString();
+       }
+
+       @Override
+       public TaskManagerLocation getCurrentAssignedResourceLocation() {
+               return currentExecution.getAssignedResourceLocation();
+       }
+
+       @Override
+       public ArchivedExecution getPriorExecutionAttempt(int attemptNumber) {
+               if (attemptNumber >= 0 && attemptNumber < 
priorExecutions.size()) {
+                       return priorExecutions.get(attemptNumber);
+               } else {
+                       throw new IllegalArgumentException("attempt does not 
exist");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b92e3af..0b56931 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnFailure;
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -102,7 +103,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * occasional double-checking to ensure that the state after a completed call 
is as expected, and trigger correcting
  * actions if it is not. Many actions are also idempotent (like canceling).
  */
-public class Execution {
+public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution> {
 
        private static final AtomicReferenceFieldUpdater<Execution, 
ExecutionState> STATE_UPDATER =
                        AtomicReferenceFieldUpdater.newUpdater(Execution.class, 
ExecutionState.class, "state");
@@ -188,14 +189,17 @@ public class Execution {
                return vertex;
        }
 
+       @Override
        public ExecutionAttemptID getAttemptId() {
                return attemptId;
        }
 
+       @Override
        public int getAttemptNumber() {
                return attemptNumber;
        }
 
+       @Override
        public ExecutionState getState() {
                return state;
        }
@@ -204,6 +208,7 @@ public class Execution {
                return assignedResource;
        }
 
+       @Override
        public TaskManagerLocation getAssignedResourceLocation() {
                return assignedResourceLocation;
        }
@@ -212,10 +217,17 @@ public class Execution {
                return failureCause;
        }
 
+       @Override
+       public String getFailureCauseAsString() {
+               return ExceptionUtils.stringifyException(getFailureCause());
+       }
+
+       @Override
        public long[] getStateTimestamps() {
                return stateTimestamps;
        }
 
+       @Override
        public long getStateTimestamp(ExecutionState state) {
                return this.stateTimestamps[state.ordinal()];
        }
@@ -237,21 +249,6 @@ public class Execution {
        }
 
        /**
-        * This method cleans fields that are irrelevant for the archived 
execution attempt.
-        */
-       public void prepareForArchiving() {
-               if (assignedResource != null && assignedResource.isAlive()) {
-                       throw new IllegalStateException("Cannot archive 
Execution while the assigned resource is still running.");
-               }
-               assignedResource = null;
-
-               executionContext = null;
-
-               partialInputChannelDeploymentDescriptors.clear();
-               partialInputChannelDeploymentDescriptors = null;
-       }
-
-       /**
         * Sets the initial state for the execution. The serialized state is 
then shipped via the
         * {@link TaskDeploymentDescriptor} to the TaskManagers.
         *
@@ -1055,14 +1052,21 @@ public class Execution {
                return userAccumulators;
        }
 
+       @Override
        public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
                return 
StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
        }
 
+       @Override
        public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getFlinkAccumulators() {
                return flinkAccumulators;
        }
 
+       @Override
+       public int getParallelSubtaskIndex() {
+               return getVertex().getParallelSubtaskIndex();
+       }
+
        // 
------------------------------------------------------------------------
        //  Standard utilities
        // 
------------------------------------------------------------------------
@@ -1072,4 +1076,9 @@ public class Execution {
                return String.format("Attempt #%d (%s) @ %s - [%s]", 
attemptNumber, vertex.getSimpleName(),
                                (assignedResource == null ? "(unassigned)" : 
assignedResource.toString()), state);
        }
+
+       @Override
+       public ArchivedExecution archive() {
+               return new ArchivedExecution(this);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 10f0e88..aa9406c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -32,14 +33,16 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
-import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -58,8 +61,10 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
+import scala.Option;
 
 import java.io.IOException;
 import java.net.URL;
@@ -102,7 +107,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *         address the message receiver.</li>
  * </ul>
  */
-public class ExecutionGraph {
+public class ExecutionGraph implements AccessExecutionGraph, 
Archiveable<ArchivedExecutionGraph> {
 
        private static final AtomicReferenceFieldUpdater<ExecutionGraph, 
JobStatus> STATE_UPDATER =
                        
AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, 
"state");
@@ -180,9 +185,6 @@ public class ExecutionGraph {
         * from results than need to be materialized. */
        private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
-       /** Flag to indicate whether the Graph has been archived */
-       private boolean isArchived = false;
-
        // ------ Execution status and progress. These values are volatile, and 
accessed under the lock -------
 
        /** Current status of the job execution */
@@ -222,9 +224,6 @@ public class ExecutionGraph {
        // ------ Fields that are only relevant for archived execution graphs 
------------
        private String jsonPlan;
 
-       /** Serializable summary of all job config values, e.g. for web 
interface */
-       private ExecutionConfigSummary executionConfigSummary;
-
        // 
--------------------------------------------------------------------------------------------
        //   Constructors
        // 
--------------------------------------------------------------------------------------------
@@ -304,16 +303,6 @@ public class ExecutionGraph {
                metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new 
RestartTimeGauge());
 
                this.kvStateLocationRegistry = new 
KvStateLocationRegistry(jobId, getAllVertices());
-
-               // create a summary of all relevant data accessed in the web 
interface's JobConfigHandler
-               try {
-                       ExecutionConfig executionConfig = 
serializedConfig.deserializeValue(userClassLoader);
-                       if (executionConfig != null) {
-                               this.executionConfigSummary = new 
ExecutionConfigSummary(executionConfig);
-                       }
-               } catch (IOException | ClassNotFoundException e) {
-                       LOG.error("Couldn't create ExecutionConfigSummary for 
job {} ", jobID, e);
-               }
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -344,8 +333,9 @@ public class ExecutionGraph {
                return scheduleMode;
        }
 
+       @Override
        public boolean isArchived() {
-               return isArchived;
+               return false;
        }
 
        public void enableSnapshotCheckpointing(
@@ -434,6 +424,7 @@ public class ExecutionGraph {
                return restartStrategy;
        }
 
+       @Override
        public CheckpointStatsTracker getCheckpointStatsTracker() {
                return checkpointStatsTracker;
        }
@@ -484,6 +475,7 @@ public class ExecutionGraph {
                this.jsonPlan = jsonPlan;
        }
 
+       @Override
        public String getJsonPlan() {
                return jsonPlan;
        }
@@ -492,14 +484,17 @@ public class ExecutionGraph {
                return slotProvider;
        }
 
+       @Override
        public JobID getJobID() {
                return jobID;
        }
 
+       @Override
        public String getJobName() {
                return jobName;
        }
 
+       @Override
        public boolean isStoppable() {
                return this.isStoppable;
        }
@@ -512,6 +507,7 @@ public class ExecutionGraph {
                return this.userClassLoader;
        }
 
+       @Override
        public JobStatus getState() {
                return state;
        }
@@ -520,14 +516,22 @@ public class ExecutionGraph {
                return failureCause;
        }
 
+       @Override
+       public String getFailureCauseAsString() {
+               return ExceptionUtils.stringifyException(failureCause);
+       }
+
+       @Override
        public ExecutionJobVertex getJobVertex(JobVertexID id) {
                return this.tasks.get(id);
        }
 
+       @Override
        public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
                return Collections.unmodifiableMap(this.tasks);
        }
 
+       @Override
        public Iterable<ExecutionJobVertex> getVerticesTopologically() {
                // we return a specific iterator that does not fail with 
concurrent modifications
                // the list is append only, so it is safe for that
@@ -566,6 +570,7 @@ public class ExecutionGraph {
                return Collections.unmodifiableMap(this.intermediateResults);
        }
 
+       @Override
        public Iterable<ExecutionVertex> getAllExecutionVertices() {
                return new Iterable<ExecutionVertex>() {
                        @Override
@@ -575,6 +580,7 @@ public class ExecutionGraph {
                };
        }
 
+       @Override
        public long getStatusTimestamp(JobStatus status) {
                return this.stateTimestamps[status.ordinal()];
        }
@@ -592,6 +598,7 @@ public class ExecutionGraph {
         * Gets the internal flink accumulator map of maps which contains some 
metrics.
         * @return A map of accumulators for every executed task.
         */
+       @Override
        public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?,?>>> getFlinkAccumulators() {
                Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>>> flinkAccumulators =
                                new HashMap<ExecutionAttemptID, 
Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>();
@@ -627,6 +634,7 @@ public class ExecutionGraph {
         * @return The accumulator map with serialized accumulator values.
         * @throws IOException
         */
+       @Override
        public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() 
throws IOException {
 
                Map<String, Accumulator<?, ?>> accumulatorMap = 
aggregateUserAccumulators();
@@ -643,6 +651,7 @@ public class ExecutionGraph {
         * Returns the a stringified version of the user-defined accumulators.
         * @return an Array containing the StringifiedAccumulatorResult objects
         */
+       @Override
        public StringifiedAccumulatorResult[] 
getAccumulatorResultsStringified() {
                Map<String, Accumulator<?, ?>> accumulatorMap = 
aggregateUserAccumulators();
                return 
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
@@ -926,51 +935,21 @@ public class ExecutionGraph {
        }
 
        /**
-        * This method cleans fields that are irrelevant for the archived 
execution attempt.
+        * Returns the serializable ArchivedExecutionConfig
+        * @return ArchivedExecutionConfig which may be null in case of errors
         */
-       public void prepareForArchiving() {
-               if (!state.isGloballyTerminalState()) {
-                       throw new IllegalStateException("Can only archive the 
job from a terminal state");
-               }
-
-               // clear the non-serializable fields
-               restartStrategy = null;
-               slotProvider = null;
-               checkpointCoordinator = null;
-               executionContext = null;
-               kvStateLocationRegistry = null;
-
-               for (ExecutionJobVertex vertex : verticesInCreationOrder) {
-                       vertex.prepareForArchiving();
-               }
-
-               intermediateResults.clear();
-               currentExecutions.clear();
-               requiredJarFiles.clear();
-               requiredClasspaths.clear();
-               jobStatusListeners.clear();
-               executionListeners.clear();
-
-               if (userClassLoader instanceof FlinkUserCodeClassLoader) {
-                       try {
-                               // close the classloader to free space of user 
jars immediately
-                               // otherwise we have to wait until garbage 
collection
-                               ((FlinkUserCodeClassLoader) 
userClassLoader).close();
-                       } catch (IOException e) {
-                               LOG.warn("Failed to close the user classloader 
for job {}", jobID, e);
+       @Override
+       public ArchivedExecutionConfig getArchivedExecutionConfig() {
+               // create a summary of all relevant data accessed in the web 
interface's JobConfigHandler
+               try {
+                       ExecutionConfig executionConfig = 
getSerializedExecutionConfig().deserializeValue(userClassLoader);
+                       if (executionConfig != null) {
+                               return executionConfig.archive();
                        }
-               }
-               userClassLoader = null;
-
-               isArchived = true;
-       }
-
-       /**
-        * Returns the serializable ExecutionConfigSummary
-        * @return ExecutionConfigSummary which may be null in case of errors
-        */
-       public ExecutionConfigSummary getExecutionConfigSummary() {
-               return executionConfigSummary;
+               } catch (IOException | ClassNotFoundException e) {
+                       LOG.error("Couldn't create ArchivedExecutionConfig for 
job {} ", jobID, e);
+               };
+               return null;
        }
 
        /**
@@ -1282,4 +1261,53 @@ public class ExecutionGraph {
                        }
                }
        }
+
+       @Override
+       public ArchivedExecutionGraph archive() {
+               Map<JobVertexID, OperatorCheckpointStats> operatorStats = new 
HashMap<>();
+               Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = 
new HashMap<>();
+               List<ArchivedExecutionJobVertex> 
archivedVerticesInCreationOrder = new ArrayList<>();
+               for (ExecutionJobVertex task : verticesInCreationOrder) {
+                       ArchivedExecutionJobVertex archivedTask = 
task.archive();
+                       archivedVerticesInCreationOrder.add(archivedTask);
+                       archivedTasks.put(task.getJobVertexId(), archivedTask);
+                       Option<OperatorCheckpointStats> statsOption = 
task.getCheckpointStats();
+                       if (statsOption.isDefined()) {
+                               operatorStats.put(task.getJobVertexId(), 
statsOption.get());
+                       }
+               }
+
+               Option<JobCheckpointStats> jobStats;
+               if (getCheckpointStatsTracker() == null) {
+                       jobStats = Option.empty();
+               } else {
+                       jobStats = getCheckpointStatsTracker().getJobStats();
+               }
+
+               ArchivedCheckpointStatsTracker statsTracker = new 
ArchivedCheckpointStatsTracker(jobStats, operatorStats);
+
+               Map<String, SerializedValue<Object>> serializedUserAccumulators;
+               try {
+                       serializedUserAccumulators = 
getAccumulatorsSerialized();
+               } catch (Exception e) {
+                       LOG.warn("Error occurred while archiving user 
accumulators.", e);
+                       serializedUserAccumulators = Collections.emptyMap();
+               }
+
+               return new ArchivedExecutionGraph(
+                       getJobID(),
+                       getJobName(),
+                       archivedTasks,
+                       archivedVerticesInCreationOrder,
+                       stateTimestamps,
+                       getState(),
+                       getFailureCauseAsString(),
+                       getJsonPlan(),
+                       getAccumulatorResultsStringified(),
+                       serializedUserAccumulators,
+                       getArchivedExecutionConfig(),
+                       isStoppable(),
+                       statsTracker
+               );
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index ead0852..e7f16a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -28,7 +28,10 @@ import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -43,6 +46,7 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
+import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
@@ -51,7 +55,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ExecutionJobVertex {
+public class ExecutionJobVertex implements AccessExecutionJobVertex, 
Archiveable<ArchivedExecutionJobVertex> {
 
        /** Use the same log for all ExecutionGraph classes */
        private static final Logger LOG = ExecutionGraph.LOG;
@@ -197,10 +201,17 @@ public class ExecutionJobVertex {
                return jobVertex;
        }
 
+       @Override
+       public String getName() {
+               return getJobVertex().getName();
+       }
+
+       @Override
        public int getParallelism() {
                return parallelism;
        }
 
+       @Override
        public int getMaxParallelism() {
                return maxParallelism;
        }
@@ -209,10 +220,12 @@ public class ExecutionJobVertex {
                return graph.getJobID();
        }
        
+       @Override
        public JobVertexID getJobVertexId() {
                return jobVertex.getID();
        }
        
+       @Override
        public ExecutionVertex[] getTaskVertices() {
                return taskVertices;
        }
@@ -241,6 +254,7 @@ public class ExecutionJobVertex {
                return numSubtasksInFinalState == parallelism;
        }
        
+       @Override
        public ExecutionState getAggregateState() {
                int[] num = new int[ExecutionState.values().length];
                for (ExecutionVertex vertex : this.taskVertices) {
@@ -250,6 +264,16 @@ public class ExecutionJobVertex {
                return getAggregateJobVertexState(num, parallelism);
        }
        
+       @Override
+       public Option<OperatorCheckpointStats> getCheckpointStats() {
+               CheckpointStatsTracker tracker = 
getGraph().getCheckpointStatsTracker();
+               if (tracker == null) {
+                       return Option.empty();
+               } else {
+                       return tracker.getOperatorStats(getJobVertexId());
+               }
+       }
+
        
//---------------------------------------------------------------------------------------------
        
        public void connectToPredecessors(Map<IntermediateDataSetID, 
IntermediateResult> intermediateDataSets) throws JobException {
@@ -371,36 +395,6 @@ public class ExecutionJobVertex {
                }
        }
        
-       /**
-        * This method cleans fields that are irrelevant for the archived 
execution attempt.
-        */
-       public void prepareForArchiving() {
-               
-               for (ExecutionVertex vertex : taskVertices) {
-                       vertex.prepareForArchiving();
-               }
-               
-               // clear intermediate results
-               inputs.clear();
-               producedDataSets = null;
-               
-               // reset shared groups
-               if (slotSharingGroup != null) {
-                       slotSharingGroup.clearTaskAssignment();
-               }
-               if (coLocationGroup != null) {
-                       coLocationGroup.resetConstraints();
-               }
-               
-               // reset splits and split assigner
-               splitAssigner = null;
-               if (inputSplits != null) {
-                       for (int i = 0; i < inputSplits.length; i++) {
-                               inputSplits[i] = null;
-                       }
-               }
-       }
-       
        
//---------------------------------------------------------------------------------------------
        //  Notifications
        
//---------------------------------------------------------------------------------------------
@@ -627,4 +621,9 @@ public class ExecutionJobVertex {
                        return ExecutionState.CREATED;
                }
        }
+
+       @Override
+       public ArchivedExecutionJobVertex archive() {
+               return new ArchivedExecutionJobVertex(this);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 4837803..96af91e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -72,7 +74,7 @@ import static 
org.apache.flink.runtime.execution.ExecutionState.FINISHED;
  * The ExecutionVertex is a parallel subtask of the execution. It may be 
executed once, or several times, each of
  * which time it spawns an {@link Execution}.
  */
-public class ExecutionVertex {
+public class ExecutionVertex implements AccessExecutionVertex, 
Archiveable<ArchivedExecutionVertex> {
 
        private static final Logger LOG = ExecutionGraph.LOG;
 
@@ -176,6 +178,7 @@ public class ExecutionVertex {
                return this.jobVertex.getJobVertex().getName();
        }
 
+       @Override
        public String getTaskNameWithSubtaskIndex() {
                return this.taskNameWithSubtask;
        }
@@ -188,6 +191,7 @@ public class ExecutionVertex {
                return this.jobVertex.getMaxParallelism();
        }
 
+       @Override
        public int getParallelSubtaskIndex() {
                return this.subTaskIndex;
        }
@@ -207,18 +211,26 @@ public class ExecutionVertex {
                return locationConstraint;
        }
 
+       @Override
        public Execution getCurrentExecutionAttempt() {
                return currentExecution;
        }
 
+       @Override
        public ExecutionState getExecutionState() {
                return currentExecution.getState();
        }
 
+       @Override
        public long getStateTimestamp(ExecutionState state) {
                return currentExecution.getStateTimestamp(state);
        }
 
+       @Override
+       public String getFailureCauseAsString() {
+               return ExceptionUtils.stringifyException(getFailureCause());
+       }
+
        public Throwable getFailureCause() {
                return currentExecution.getFailureCause();
        }
@@ -227,10 +239,12 @@ public class ExecutionVertex {
                return currentExecution.getAssignedResource();
        }
 
+       @Override
        public TaskManagerLocation getCurrentAssignedResourceLocation() {
                return currentExecution.getAssignedResourceLocation();
        }
 
+       @Override
        public Execution getPriorExecutionAttempt(int attemptNumber) {
                if (attemptNumber >= 0 && attemptNumber < 
priorExecutions.size()) {
                        return priorExecutions.get(attemptNumber);
@@ -240,6 +254,10 @@ public class ExecutionVertex {
                }
        }
 
+       List<Execution> getPriorExecutions() {
+               return priorExecutions;
+       }
+
        public ExecutionGraph getExecutionGraph() {
                return this.jobVertex.getGraph();
        }
@@ -537,31 +555,6 @@ public class ExecutionVertex {
                }
        }
 
-       /**
-        * This method cleans fields that are irrelevant for the archived 
execution attempt.
-        */
-       public void prepareForArchiving() throws IllegalStateException {
-               Execution execution = currentExecution;
-
-               // sanity check
-               if (!execution.isFinished()) {
-                       throw new IllegalStateException("Cannot archive 
ExecutionVertex that is not in a finished state.");
-               }
-
-               // prepare the current execution for archiving
-               execution.prepareForArchiving();
-
-               // prepare previous executions for archiving
-               for (Execution exec : priorExecutions) {
-                       exec.prepareForArchiving();
-               }
-
-               // clear the unnecessary fields in this class
-               this.resultPartitions = null;
-               this.inputEdges = null;
-               this.locationConstraint = null;
-       }
-
        public void cachePartitionInfo(PartialInputChannelDeploymentDescriptor 
partitionInfo){
                getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo);
        }
@@ -708,4 +701,9 @@ public class ExecutionVertex {
        public String toString() {
                return getSimpleName();
        }
+
+       @Override
+       public ArchivedExecutionVertex archive() {
+               return new ArchivedExecutionVertex(this);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
deleted file mode 100644
index ad4677f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.executiongraph.archive;
-
-import org.apache.flink.api.common.ExecutionConfig;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Serializable class which is created when archiving the job.
- * It can be used to display job information on the web interface
- * without having to keep the classloader around after job completion.
- */
-public class ExecutionConfigSummary implements Serializable {
-
-       private final String executionMode;
-       private final String restartStrategyDescription;
-       private final int parallelism;
-       private final boolean objectReuseEnabled;
-       private final Map<String, String> globalJobParameters;
-
-       public ExecutionConfigSummary(ExecutionConfig ec) {
-               executionMode = ec.getExecutionMode().name();
-               if (ec.getRestartStrategy() != null) {
-                       restartStrategyDescription = 
ec.getRestartStrategy().getDescription();
-               } else {
-                       restartStrategyDescription = "default";
-               }
-               parallelism = ec.getParallelism();
-               objectReuseEnabled = ec.isObjectReuseEnabled();
-               if (ec.getGlobalJobParameters() != null
-                               && ec.getGlobalJobParameters().toMap() != null) 
{
-                       globalJobParameters = 
ec.getGlobalJobParameters().toMap();
-               } else {
-                       globalJobParameters = Collections.emptyMap();
-               }
-       }
-
-       public String getExecutionMode() {
-               return executionMode;
-       }
-
-       public String getRestartStrategyDescription() {
-               return restartStrategyDescription;
-       }
-
-       public int getParallelism() {
-               return parallelism;
-       }
-
-       public boolean getObjectReuseEnabled() {
-               return objectReuseEnabled;
-       }
-
-       public Map<String, String> getGlobalJobParameters() {
-               return globalJobParameters;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 37a91b3..87df0d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -27,9 +27,9 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -164,7 +164,7 @@ public final class WebMonitorUtils {
                }
        }
 
-       public static JobDetails createDetailsForJob(ExecutionGraph job) {
+       public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
                JobStatus status = job.getState();
 
                long started = job.getStatusTimestamp(JobStatus.CREATED);
@@ -174,11 +174,11 @@ public final class WebMonitorUtils {
                long lastChanged = 0;
                int numTotalTasks = 0;
 
-               for (ExecutionJobVertex ejv : job.getVerticesTopologically()) {
-                       ExecutionVertex[] vertices = ejv.getTaskVertices();
+               for (AccessExecutionJobVertex ejv : 
job.getVerticesTopologically()) {
+                       AccessExecutionVertex[] vertices = 
ejv.getTaskVertices();
                        numTotalTasks += vertices.length;
 
-                       for (ExecutionVertex vertex : vertices) {
+                       for (AccessExecutionVertex vertex : vertices) {
                                ExecutionState state = 
vertex.getExecutionState();
                                countsPerStatus[state.ordinal()]++;
                                lastChanged = Math.max(lastChanged, 
vertex.getStateTimestamp(state));

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cca0124..8f3b82a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1685,12 +1685,9 @@ class JobManager(
           }(context.dispatcher))
 
           try {
-            eg.prepareForArchiving()
-
-            archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
+            archive ! decorateMessage(ArchiveExecutionGraph(jobID, 
eg.archive()))
           } catch {
-            case t: Throwable => log.error(s"Could not prepare the execution 
graph $eg for " +
-              "archiving.", t)
+            case t: Throwable => log.error(s"Could not archive the execution 
graph $eg.", t)
           }
 
           futureOption

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 2d55b26..7f8fcd3 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils
 import org.apache.flink.runtime.{FlinkActor, LogMessages}
 import org.apache.flink.runtime.messages.webmonitor._
-import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, 
ExecutionGraph}
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
@@ -66,7 +66,7 @@ class MemoryArchivist(private val max_entries: Int)
    * Map of execution graphs belonging to recently started jobs with the time 
stamp of the last
    * received job event. The insert order is preserved through a LinkedHashMap.
    */
-  protected val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]()
+  protected val graphs = mutable.LinkedHashMap[JobID, ArchivedExecutionGraph]()
 
   /* Counters for finished, canceled, and failed jobs */
   private[this] var finishedCnt: Int = 0

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index c4e3f3e..435b736 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -19,14 +19,14 @@
 package org.apache.flink.runtime.messages
 
 import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, 
ExecutionGraph}
 
 /**
  * This object contains the archive specific messages.
  */
 object ArchiveMessages {
   
-  case class ArchiveExecutionGraph(jobID: JobID, graph: ExecutionGraph)
+  case class ArchiveExecutionGraph(jobID: JobID, graph: ArchivedExecutionGraph)
 
   /**
    * Request the currently archived jobs in the archiver. The resulting 
response is [[ArchivedJobs]]
@@ -44,19 +44,19 @@ object ArchiveMessages {
    */
   case class RequestArchivedJob(jobID: JobID)
 
-  case class ArchivedJob(job: Option[ExecutionGraph])
+  case class ArchivedJob(job: Option[ArchivedExecutionGraph])
 
   /**
    * Response to [[RequestArchivedJobs]] message. The response contains the 
archived jobs.
    * @param jobs
    */
-  case class ArchivedJobs(jobs: Iterable[ExecutionGraph]){
-    def asJavaIterable: java.lang.Iterable[ExecutionGraph] = {
+  case class ArchivedJobs(jobs: Iterable[ArchivedExecutionGraph]){
+    def asJavaIterable: java.lang.Iterable[ArchivedExecutionGraph] = {
       import scala.collection.JavaConverters._
       jobs.asJava
     }
 
-    def asJavaCollection: java.util.Collection[ExecutionGraph] = {
+    def asJavaCollection: java.util.Collection[ArchivedExecutionGraph] = {
       import scala.collection.JavaConverters._
       jobs.asJavaCollection
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 4cf6a02..3df8c26 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.blob.BlobKey
 import org.apache.flink.runtime.client.{JobStatusMessage, 
SerializedJobExecutionResult}
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, 
ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, 
ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.{Instance, InstanceID}
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID
 import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, 
JobStatus, JobVertexID}
@@ -371,7 +371,7 @@ object JobManagerMessages {
    * @param jobID
    * @param executionGraph
    */
-  case class JobFound(jobID: JobID, executionGraph: ExecutionGraph) extends 
JobResponse
+  case class JobFound(jobID: JobID, executionGraph: AccessExecutionGraph) 
extends JobResponse
 
   /**
    * Denotes that there is no job with [[jobID]] retrievable. This message can 
be the response of

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index ea4d322..0b2f4f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -81,7 +81,7 @@ public class CoordinatorShutdownTest {
                                        new 
JobManagerMessages.RequestJob(testGraph.getJobID()),
                                        timeout);
                        
-                       ExecutionGraph graph = ((JobManagerMessages.JobFound) 
Await.result(jobRequestFuture, timeout)).executionGraph();
+                       ExecutionGraph graph = 
(ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, 
timeout)).executionGraph();
                        
                        assertNotNull(graph);
                        graph.waitUntilFinished();
@@ -133,7 +133,7 @@ public class CoordinatorShutdownTest {
                                        new 
JobManagerMessages.RequestJob(testGraph.getJobID()),
                                        timeout);
 
-                       ExecutionGraph graph = ((JobManagerMessages.JobFound) 
Await.result(jobRequestFuture, timeout)).executionGraph();
+                       ExecutionGraph graph = 
(ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, 
timeout)).executionGraph();
 
                        assertNotNull(graph);
                        graph.waitUntilFinished();

Reply via email to