[FLINK-3327] ExecutionConfig to JobGraph

This closes #1583


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f8d76c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f8d76c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f8d76c6

Branch: refs/heads/master
Commit: 0f8d76c6f71e0b4a07ed11a112c3bd21c300e966
Parents: d0a390f
Author: kl0u <kklou...@gmail.com>
Authored: Fri Mar 11 22:31:12 2016 +0100
Committer: zentol <ches...@apache.org>
Committed: Sat Mar 12 00:07:49 2016 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       |  6 +-
 .../java/org/apache/flink/api/common/Plan.java  | 11 ---
 .../plantranslate/JobGraphGenerator.java        | 16 +---
 .../BackPressureStatsTrackerITCase.java         |  3 +-
 .../StackTraceSampleCoordinatorITCase.java      |  3 +-
 .../deployment/TaskDeploymentDescriptor.java    | 30 ++++---
 .../flink/runtime/execution/Environment.java    |  8 ++
 .../runtime/executiongraph/ExecutionGraph.java  | 18 ++---
 .../runtime/executiongraph/ExecutionVertex.java |  4 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java | 82 ++++++++++----------
 .../jobgraph/tasks/AbstractInvokable.java       | 27 +------
 .../flink/runtime/operators/DataSinkTask.java   | 20 +----
 .../flink/runtime/operators/DataSourceTask.java | 18 +----
 .../runtime/taskmanager/RuntimeEnvironment.java | 11 ++-
 .../apache/flink/runtime/taskmanager/Task.java  |  9 ++-
 .../flink/runtime/jobmanager/JobManager.scala   |  3 +-
 .../checkpoint/CoordinatorShutdownTest.java     |  5 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |  2 +
 .../client/JobClientActorRecoveryITCase.java    |  3 +-
 .../runtime/client/JobClientActorTest.java      |  3 +-
 .../TaskDeploymentDescriptorTest.java           | 12 ++-
 .../ExecutionGraphConstructionTest.java         | 13 +++-
 .../ExecutionGraphDeploymentTest.java           |  3 +
 .../ExecutionGraphRestartTest.java              | 36 +++++----
 .../ExecutionGraphSignalsTest.java              |  2 +
 .../executiongraph/ExecutionGraphTestUtils.java |  2 +
 .../ExecutionStateProgressTest.java             |  2 +
 .../executiongraph/LocalInputSplitsTest.java    |  7 +-
 .../executiongraph/PointwisePatternTest.java    |  8 ++
 .../TerminalStateDeadlockTest.java              |  2 +
 .../VertexLocationConstraintTest.java           | 19 +++--
 .../executiongraph/VertexSlotSharingTest.java   |  2 +
 .../PartialConsumePipelinedResultTest.java      |  3 +-
 .../flink/runtime/jobgraph/JobGraphTest.java    | 25 +++---
 .../jobgraph/jsonplan/JsonGeneratorTest.java    |  5 +-
 .../runtime/jobmanager/JobManagerTest.java      |  7 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |  5 +-
 .../SlotCountExceedingParallelismTest.java      |  3 +-
 .../StandaloneSubmittedJobGraphStoreTest.java   |  3 +-
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java |  3 +-
 .../ScheduleOrUpdateConsumersTest.java          |  2 +
 .../LeaderChangeStateCleanupTest.java           |  3 +-
 .../operators/testutils/DummyEnvironment.java   |  7 ++
 .../operators/testutils/MockEnvironment.java    |  9 +++
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  3 +-
 .../runtime/taskmanager/TaskCancelTest.java     |  3 +-
 .../runtime/taskmanager/TaskManagerTest.java    | 45 +++++++----
 .../flink/runtime/taskmanager/TaskStopTest.java |  2 +
 .../flink/runtime/taskmanager/TaskTest.java     |  3 +-
 .../TaskManagerLossFailsTasksTest.scala         |  5 +-
 .../jobmanager/CoLocationConstraintITCase.scala |  3 +-
 .../runtime/jobmanager/JobManagerITCase.scala   | 48 ++++++------
 .../runtime/jobmanager/RecoveryITCase.scala     | 16 +++-
 .../runtime/jobmanager/SlotSharingITCase.scala  |  6 +-
 .../TaskManagerFailsWithSlotSharingITCase.scala |  6 +-
 .../api/graph/StreamingJobGraphGenerator.java   | 14 +---
 .../streaming/api/RestartStrategyTest.java      |  6 +-
 .../graph/StreamingJobGraphGeneratorTest.java   | 10 ++-
 .../partitioner/RescalePartitionerTest.java     |  2 +
 .../runtime/tasks/StreamMockEnvironment.java    | 18 ++++-
 .../tasks/StreamTaskAsyncCheckpointTest.java    |  6 ++
 .../streaming/runtime/tasks/StreamTaskTest.java |  3 +-
 .../runtime/tasks/StreamTaskTestHarness.java    | 10 +--
 .../JobSubmissionFailsITCase.java               |  7 +-
 .../flink/test/javaApiOperators/MapITCase.java  | 33 ++++++++
 .../JobManagerHACheckpointRecoveryITCase.java   |  3 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |  3 +-
 .../runtime/NetworkStackThroughputITCase.java   |  3 +-
 .../ZooKeeperLeaderElectionITCase.java          |  3 +-
 .../flink/test/web/WebFrontendITCase.java       |  5 +-
 .../jobmanager/JobManagerFailsITCase.scala      |  5 +-
 .../JobManagerLeaderSessionIDITSuite.scala      |  3 +-
 .../taskmanager/TaskManagerFailsITCase.scala    | 11 ++-
 73 files changed, 442 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 9642fa9..9b0ff52 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -59,9 +59,6 @@ public class ExecutionConfig implements Serializable {
 
        private static final long serialVersionUID = 1L;
 
-       // Key for storing it in the Job Configuration
-       public static final String CONFIG_KEY = "runtime.config";
-
        /**
         * The constant to use for the parallelism, if the system should use 
the number
         *  of currently available slots.
@@ -648,7 +645,8 @@ public class ExecutionConfig implements Serializable {
                                Objects.equals(executionMode, 
other.executionMode) &&
                                useClosureCleaner == other.useClosureCleaner &&
                                parallelism == other.parallelism &&
-                               
restartStrategyConfiguration.equals(other.restartStrategyConfiguration) &&
+                               ((restartStrategyConfiguration == null && 
other.restartStrategyConfiguration == null) ||
+                               
restartStrategyConfiguration.equals(other.restartStrategyConfiguration)) &&
                                forceKryo == other.forceKryo &&
                                objectReuse == other.objectReuse &&
                                autoTypeRegistrationEnabled == 
other.autoTypeRegistrationEnabled &&

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java 
b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index d81fcd1..3e5cb61 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -38,7 +38,6 @@ import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Visitable;
@@ -295,16 +294,6 @@ public class Plan implements Visitable<Operator<?>> {
        }
 
        /**
-        * Returns the specified restart strategy configuration. This 
configuration defines the used
-        * restart strategy to be used at runtime.
-        *
-        * @return The specified restart strategy configuration
-        */
-       public RestartStrategies.RestartStrategyConfiguration 
getRestartStrategyConfiguration() {
-               return getExecutionConfig().getRestartStrategy();
-       }
-
-       /**
         * Gets the optimizer post-pass class for this job. The post-pass 
typically creates utility classes
         * for data types and is specific to a particular data model (record, 
tuple, Scala, ...)
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index f59c347..c7aaa7d 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -20,7 +20,6 @@ package org.apache.flink.optimizer.plantranslate;
 
 import com.fasterxml.jackson.core.JsonFactory;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
@@ -81,11 +80,9 @@ import 
org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.Visitor;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -217,9 +214,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                // ----------- finalize the job graph -----------
                
                // create the job graph object
-               JobGraph graph = new JobGraph(jobId, program.getJobName());
-
-               
graph.setRestartStrategyConfiguration(program.getOriginalPlan().getRestartStrategyConfiguration());
+               JobGraph graph = new JobGraph(jobId, program.getJobName(), 
program.getOriginalPlan().getExecutionConfig());
                graph.setAllowQueuedScheduling(false);
                
graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
 
@@ -238,15 +233,6 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                        DistributedCache.writeFileInfoToConfig(e.getKey(), 
e.getValue(), graph.getJobConfiguration());
                }
 
-               try {
-                       InstantiationUtil.writeObjectToConfig(
-                                       
program.getOriginalPlan().getExecutionConfig(),
-                                       graph.getJobConfiguration(),
-                                       ExecutionConfig.CONFIG_KEY);
-               } catch (IOException e) {
-                       throw new RuntimeException("Config object could not be 
written to Job Configuration: " + e);
-               }
-
                // release all references again
                this.vertices = null;
                this.chainedTasks = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 2659185..1f0b2ef 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
@@ -92,7 +93,7 @@ public class BackPressureStatsTrackerITCase extends 
TestLogger {
                        final FiniteDuration deadline = new FiniteDuration(60, 
TimeUnit.SECONDS);
 
                        // The JobGraph
-                       final JobGraph jobGraph = new JobGraph();
+                       final JobGraph jobGraph = new JobGraph(new 
ExecutionConfig());
                        final int parallelism = 4;
 
                        final JobVertex task = new JobVertex("Task");

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 63c3712..c6ce315 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -76,7 +77,7 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                        final FiniteDuration deadline = new FiniteDuration(60, 
TimeUnit.SECONDS);
 
                        // The JobGraph
-                       final JobGraph jobGraph = new JobGraph();
+                       final JobGraph jobGraph = new JobGraph(new 
ExecutionConfig());
                        final int parallelism = 1;
 
                        final JobVertex task = new JobVertex("Task");

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 983ad38..60b8ba6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.deployment;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
@@ -88,6 +89,9 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
 
        private final SerializedValue<StateHandle<?>> operatorState;
 
+       /** The execution configuration (see {@link ExecutionConfig}) related 
to the specific job. */
+       private final ExecutionConfig executionConfig;
+
        private long recoveryTimestamp;
                
        /**
@@ -95,9 +99,9 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
         */
        public TaskDeploymentDescriptor(
                        JobID jobID, JobVertexID vertexID, ExecutionAttemptID 
executionId,
-                       String taskName, int indexInSubtaskGroup, int 
numberOfSubtasks, int attemptNumber,
-                       Configuration jobConfiguration, Configuration 
taskConfiguration, String invokableClassName,
-                       List<ResultPartitionDeploymentDescriptor> 
producedPartitions,
+                       ExecutionConfig executionConfig, String taskName, int 
indexInSubtaskGroup, int numberOfSubtasks,
+                       int attemptNumber, Configuration jobConfiguration, 
Configuration taskConfiguration,
+                       String invokableClassName, 
List<ResultPartitionDeploymentDescriptor> producedPartitions,
                        List<InputGateDeploymentDescriptor> inputGates,
                        List<BlobKey> requiredJarFiles, List<URL> 
requiredClasspaths,
                        int targetSlotNumber, SerializedValue<StateHandle<?>> 
operatorState,
@@ -111,6 +115,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                this.jobID = checkNotNull(jobID);
                this.vertexID = checkNotNull(vertexID);
                this.executionId = checkNotNull(executionId);
+               this.executionConfig = checkNotNull(executionConfig);
                this.taskName = checkNotNull(taskName);
                this.indexInSubtaskGroup = indexInSubtaskGroup;
                this.numberOfSubtasks = numberOfSubtasks;
@@ -129,16 +134,23 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
 
        public TaskDeploymentDescriptor(
                        JobID jobID, JobVertexID vertexID, ExecutionAttemptID 
executionId,
-                       String taskName, int indexInSubtaskGroup, int 
numberOfSubtasks, int attemptNumber,
-                       Configuration jobConfiguration, Configuration 
taskConfiguration, String invokableClassName,
-                       List<ResultPartitionDeploymentDescriptor> 
producedPartitions,
+                       ExecutionConfig executionConfig, String taskName, int 
indexInSubtaskGroup, int numberOfSubtasks,
+                       int attemptNumber, Configuration jobConfiguration, 
Configuration taskConfiguration,
+                       String invokableClassName, 
List<ResultPartitionDeploymentDescriptor> producedPartitions,
                        List<InputGateDeploymentDescriptor> inputGates,
                        List<BlobKey> requiredJarFiles, List<URL> 
requiredClasspaths,
                        int targetSlotNumber) {
 
-               this(jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks, attemptNumber,
-                               jobConfiguration, taskConfiguration, 
invokableClassName, producedPartitions,
-                               inputGates, requiredJarFiles, 
requiredClasspaths, targetSlotNumber, null, -1);
+               this(jobID, vertexID, executionId, executionConfig, taskName, 
indexInSubtaskGroup,
+                               numberOfSubtasks, attemptNumber, 
jobConfiguration, taskConfiguration, invokableClassName,
+                               producedPartitions, inputGates, 
requiredJarFiles, requiredClasspaths, targetSlotNumber, null, -1);
+       }
+
+       /**
+        * Returns the execution configuration (see {@link ExecutionConfig}) 
related to the specific job.
+        */
+       public ExecutionConfig getExecutionConfig() {
+               return executionConfig;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 7332151..a10c463 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.execution;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -46,6 +47,13 @@ import java.util.concurrent.Future;
 public interface Environment {
 
        /**
+        * Returns the job specific {@link ExecutionConfig}.
+        *
+        * @return The execution configuration associated with the current job.
+        * */
+       ExecutionConfig getExecutionConfig();
+
+       /**
         * Returns the ID of the job that the task belongs to.
         *
         * @return the ID of the job from the original job graph

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index c17cf15..026f6b2 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorSystem;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -60,7 +61,6 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -180,6 +180,9 @@ public class ExecutionGraph implements Serializable {
 
        // ------ Configuration of the Execution -------
 
+       /** The execution configuration (see {@link ExecutionConfig}) related 
to this specific job. */
+       private ExecutionConfig executionConfig;
+
        /** Flag to indicate whether the scheduler may queue tasks for 
execution, or needs to be able
         * to deploy them immediately. */
        private boolean allowQueuedScheduling = false;
@@ -234,7 +237,6 @@ public class ExecutionGraph implements Serializable {
        private ExecutionContext executionContext;
 
        // ------ Fields that are only relevant for archived execution graphs 
------------
-       private ExecutionConfig executionConfig;
 
        private String jsonPlan;
 
@@ -250,6 +252,7 @@ public class ExecutionGraph implements Serializable {
                        JobID jobId,
                        String jobName,
                        Configuration jobConfig,
+                       ExecutionConfig config,
                        FiniteDuration timeout,
                        RestartStrategy restartStrategy) {
                this(
@@ -257,6 +260,7 @@ public class ExecutionGraph implements Serializable {
                        jobId,
                        jobName,
                        jobConfig,
+                       config,
                        timeout,
                        restartStrategy,
                        new ArrayList<BlobKey>(),
@@ -270,6 +274,7 @@ public class ExecutionGraph implements Serializable {
                        JobID jobId,
                        String jobName,
                        Configuration jobConfig,
+                       ExecutionConfig config,
                        FiniteDuration timeout,
                        RestartStrategy restartStrategy,
                        List<BlobKey> requiredJarFiles,
@@ -302,7 +307,7 @@ public class ExecutionGraph implements Serializable {
 
                this.requiredJarFiles = requiredJarFiles;
                this.requiredClasspaths = requiredClasspaths;
-
+               this.executionConfig = Preconditions.checkNotNull(config);
                this.timeout = timeout;
 
                this.restartStrategy = restartStrategy;
@@ -942,12 +947,7 @@ public class ExecutionGraph implements Serializable {
                if (!state.isTerminalState()) {
                        throw new IllegalStateException("Can only archive the 
job from a terminal state");
                }
-               // "unpack" execution config before we throw away the usercode 
classloader.
-               try {
-                       executionConfig = (ExecutionConfig) 
InstantiationUtil.readObjectFromConfig(jobConfiguration, 
ExecutionConfig.CONFIG_KEY,userClassLoader);
-               } catch (Exception e) {
-                       LOG.warn("Error deserializing the execution config 
while archiving the execution graph", e);
-               }
+
                // clear the non-serializable fields
                userClassLoader = null;
                scheduler = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index f2d30b5..80430bc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -665,10 +666,11 @@ public class ExecutionVertex implements Serializable {
                        consumedPartitions.add(new 
InputGateDeploymentDescriptor(resultId, queueToRequest, partitions));
                }
 
+               ExecutionConfig config = 
getExecutionGraph().getExecutionConfig();
                List<BlobKey> jarFiles = 
getExecutionGraph().getRequiredJarFiles();
                List<URL> classpaths = 
getExecutionGraph().getRequiredClasspaths();
 
-               return new TaskDeploymentDescriptor(getJobId(), 
getJobvertexId(), executionId, getTaskName(),
+               return new TaskDeploymentDescriptor(getJobId(), 
getJobvertexId(), executionId, config, getTaskName(),
                                subTaskIndex, 
getTotalNumberOfParallelSubtasks(), attemptNumber, 
getExecutionGraph().getJobConfiguration(),
                                jobVertex.getJobVertex().getConfiguration(), 
jobVertex.getJobVertex().getInvokableClassName(),
                                producedPartitions, consumedPartitions, 
jarFiles, classpaths, targetSlot.getRoot().getSlotNumber(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f99d754..ed714a4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.jobgraph;
 
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
@@ -33,15 +35,14 @@ import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Collections;
 import java.util.Set;
+import java.util.LinkedHashSet;
+import java.util.Iterator;
 
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 /**
  * The JobGraph represents a Flink dataflow program, at the low level that the 
JobManager accepts.
  * All programs from higher level APIs are transformed into JobGraphs.
@@ -79,9 +80,7 @@ public class JobGraph implements Serializable {
        /** Name of this job. */
        private final String jobName;
 
-       /** Configuration which defines which restart strategy to use for the 
job recovery */
-       private RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration;
-       
+       private final ExecutionConfig executionConfig;
 
        /** The number of seconds after which the corresponding ExecutionGraph 
is removed at the
         * job manager after it has been executed. */
@@ -102,60 +101,74 @@ public class JobGraph implements Serializable {
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Constructs a new job graph with no name and a random job ID.
+        * Constructs a new job graph with no name, a random job ID, and the 
given
+        * {@link ExecutionConfig}.
+        *
+        * @param config The {@link ExecutionConfig} for the job.
         */
-       public JobGraph() {
-               this((String) null);
+       public JobGraph(ExecutionConfig config) {
+               this((String) null, config);
        }
 
        /**
-        * Constructs a new job graph with the given name, a random job ID.
+        * Constructs a new job graph with the given name, the given {@link 
ExecutionConfig},
+        * and a random job ID.
         *
-        * @param jobName The name of the job
+        * @param jobName The name of the job.
+        * @param config The execution configuration of the job.
         */
-       public JobGraph(String jobName) {
-               this(null, jobName);
+       public JobGraph(String jobName, ExecutionConfig config) {
+               this(null, jobName, config);
        }
 
        /**
-        * Constructs a new job graph with the given name and a random job ID 
if null supplied as an id.
+        * Constructs a new job graph with the given job ID (or a random ID, if 
{@code null} is passed),
+        * the given name and the given execution configuration (see {@link 
ExecutionConfig}).
         *
         * @param jobId The id of the job. A random ID is generated, if {@code 
null} is passed.
         * @param jobName The name of the job.
+        * @param config The execution configuration of the job.
         */
-       public JobGraph(JobID jobId, String jobName) {
+       public JobGraph(JobID jobId, String jobName, ExecutionConfig config) {
                this.jobID = jobId == null ? new JobID() : jobId;
                this.jobName = jobName == null ? "(unnamed job)" : jobName;
+               this.executionConfig = Preconditions.checkNotNull(config);
        }
 
        /**
-        * Constructs a new job graph with no name and a random job ID if null 
supplied as an id.
+        * Constructs a new job graph with no name, a random job ID, the given 
{@link ExecutionConfig}, and
+        * the given job vertices.
         *
+        * @param config The execution configuration of the job.
         * @param vertices The vertices to add to the graph.
         */
-       public JobGraph(JobVertex... vertices) {
-               this(null, vertices);
+       public JobGraph(ExecutionConfig config, JobVertex... vertices) {
+               this(null, config, vertices);
        }
 
        /**
-        * Constructs a new job graph with the given name and a random job ID.
+        * Constructs a new job graph with the given name, the given {@link 
ExecutionConfig}, a random job ID,
+        * and the given job vertices.
         *
         * @param jobName The name of the job.
+        * @param config The execution configuration of the job.
         * @param vertices The vertices to add to the graph.
         */
-       public JobGraph(String jobName, JobVertex... vertices) {
-               this(null, jobName, vertices);
+       public JobGraph(String jobName, ExecutionConfig config, JobVertex... 
vertices) {
+               this(null, jobName, config, vertices);
        }
 
        /**
-        * Constructs a new job graph with the given name and a random job ID 
if null supplied as an id.
+        * Constructs a new job graph with the given name, the given {@link 
ExecutionConfig},
+        * the given jobId or a random one if null supplied, and the given job 
vertices.
         *
         * @param jobId The id of the job. A random ID is generated, if {@code 
null} is passed.
         * @param jobName The name of the job.
+        * @param config The execution configuration of the job.
         * @param vertices The vertices to add to the graph.
         */
-       public JobGraph(JobID jobId, String jobName, JobVertex... vertices) {
-               this(jobId, jobName);
+       public JobGraph(JobID jobId, String jobName, ExecutionConfig config, 
JobVertex... vertices) {
+               this(jobId, jobName, config);
 
                for (JobVertex vertex : vertices) {
                        addVertex(vertex);
@@ -192,23 +205,8 @@ public class JobGraph implements Serializable {
                return this.jobConfiguration;
        }
 
-       /**
-        * Sets the restart strategy configuration. This configuration 
specifies the restart strategy
-        * to be used by the ExecutionGraph in case of a restart.
-        *
-        * @param restartStrategyConfiguration Restart strategy configuration 
to be set
-        */
-       public void 
setRestartStrategyConfiguration(RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration) {
-               this.restartStrategyConfiguration = 
restartStrategyConfiguration;
-       }
-
-       /**
-        * Gets the restart strategy configuration
-        *
-        * @return Restart strategy configuration to be used
-        */
-       public RestartStrategies.RestartStrategyConfiguration 
getRestartStrategyConfiguration() {
-               return restartStrategyConfiguration;
+       public ExecutionConfig getExecutionConfig() {
+               return this.executionConfig;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 30f32a1..d7dfaf5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.BatchTask;
-import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,9 +41,6 @@ public abstract class AbstractInvokable {
        /** The environment assigned to this invokable. */
        private Environment environment;
 
-       /** The execution config, cached from the deserialization from the 
JobConfiguration */
-       private ExecutionConfig executionConfig;
-
        /**
         * Starts the execution.
         *
@@ -125,29 +121,10 @@ public abstract class AbstractInvokable {
        }
 
        /**
-        * Returns the global ExecutionConfig, obtained from the job 
configuration.
+        * Returns the global ExecutionConfig.
         */
        public ExecutionConfig getExecutionConfig() {
-               if (executionConfig != null) {
-                       return executionConfig;
-               }
-
-               try {
-                       executionConfig = (ExecutionConfig) 
InstantiationUtil.readObjectFromConfig(
-                                       getJobConfiguration(),
-                                       ExecutionConfig.CONFIG_KEY,
-                                       getUserCodeClassLoader());
-
-                       if (executionConfig == null) {
-                               LOG.warn("Environment did not contain an 
ExecutionConfig - using a default config.");
-                               executionConfig = new ExecutionConfig();
-                       }
-                       return executionConfig;
-               }
-               catch (Exception e) {
-                       LOG.warn("Could not load ExecutionConfig from 
Environment, returning default ExecutionConfig", e);
-                       return new ExecutionConfig();
-               }
+               return this.environment.getExecutionConfig();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 6972e1d..21e8784 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -41,13 +41,10 @@ import 
org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 /**
  * DataSinkTask which is executed by a task manager. The task hands the data 
to an output format.
  * 
@@ -112,21 +109,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
                        LOG.debug(getLogString("Rich Sink detected. 
Initializing runtime context."));
                }
 
-               ExecutionConfig executionConfig;
-               try {
-                       ExecutionConfig c = 
InstantiationUtil.readObjectFromConfig(
-                                       getJobConfiguration(),
-                                       ExecutionConfig.CONFIG_KEY,
-                                       getUserCodeClassLoader());
-                       if (c != null) {
-                               executionConfig = c;
-                       } else {
-                               LOG.warn("The execution config returned by the 
configuration was null");
-                               executionConfig = new ExecutionConfig();
-                       }
-               } catch (IOException | ClassNotFoundException e) {
-                       throw new RuntimeException("Could not load 
ExecutionConfig from Job Configuration: " + e);
-               }
+               ExecutionConfig executionConfig = getExecutionConfig();
+
                boolean objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
                
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 93a4f9c..960faf7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -36,11 +36,9 @@ import 
org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubExcepti
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -104,21 +102,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                        LOG.debug(getLogString("Rich Source detected. 
Initializing runtime context."));
                }
 
-               ExecutionConfig executionConfig;
-               try {
-                       ExecutionConfig c = (ExecutionConfig) 
InstantiationUtil.readObjectFromConfig(
-                                       getJobConfiguration(),
-                                       ExecutionConfig.CONFIG_KEY,
-                                       getUserCodeClassLoader());
-                       if (c != null) {
-                               executionConfig = c;
-                       } else {
-                               LOG.warn("ExecutionConfig from job 
configuration is null. Creating empty config");
-                               executionConfig = new ExecutionConfig();
-                       }
-               } catch (IOException | ClassNotFoundException e) {
-                       throw new RuntimeException("Could not load 
ExecutionConfig from Job Configuration: ", e);
-               }
+               ExecutionConfig executionConfig = getExecutionConfig();
 
                boolean objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 0fddde4..51e7e34 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
@@ -55,7 +56,8 @@ public class RuntimeEnvironment implements Environment {
        
        private final Configuration jobConfiguration;
        private final Configuration taskConfiguration;
-       
+       private final ExecutionConfig executionConfig;
+
        private final ClassLoader userCodeClassLoader;
 
        private final MemoryManager memManager;
@@ -80,6 +82,7 @@ public class RuntimeEnvironment implements Environment {
                        JobID jobId,
                        JobVertexID jobVertexId,
                        ExecutionAttemptID executionId,
+                       ExecutionConfig executionConfig,
                        TaskInfo taskInfo,
                        Configuration jobConfiguration,
                        Configuration taskConfiguration,
@@ -99,6 +102,7 @@ public class RuntimeEnvironment implements Environment {
                this.jobVertexId = checkNotNull(jobVertexId);
                this.executionId = checkNotNull(executionId);
                this.taskInfo = checkNotNull(taskInfo);
+               this.executionConfig = checkNotNull(executionConfig);
                this.jobConfiguration = checkNotNull(jobConfiguration);
                this.taskConfiguration = checkNotNull(taskConfiguration);
                this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
@@ -117,6 +121,11 @@ public class RuntimeEnvironment implements Environment {
        // 
------------------------------------------------------------------------
 
        @Override
+       public ExecutionConfig getExecutionConfig() {
+               return this.executionConfig;
+       }
+
+       @Override
        public JobID getJobID() {
                return jobId;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f2d6025..d531e43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.cache.DistributedCache;
@@ -217,6 +218,9 @@ public class Task implements Runnable {
 
        private volatile long recoveryTs;
 
+       /** The job specific execution configuration (see {@link 
ExecutionConfig}). */
+       private final ExecutionConfig executionConfig;
+
        /**
         * <p><b>IMPORTANT:</b> This constructor may not start any work that 
would need to
         * be undone in the case of a failing task deployment.</p>
@@ -245,6 +249,7 @@ public class Task implements Runnable {
                this.nameOfInvokableClass = 
checkNotNull(tdd.getInvokableClassName());
                this.operatorState = tdd.getOperatorState();
                this.recoveryTs = tdd.getRecoveryTimestamp();
+               this.executionConfig = checkNotNull(tdd.getExecutionConfig());
 
                this.memoryManager = checkNotNull(memManager);
                this.ioManager = checkNotNull(ioManager);
@@ -493,8 +498,8 @@ public class Task implements Runnable {
                        TaskInputSplitProvider splitProvider = new 
TaskInputSplitProvider(jobManager,
                                        jobId, vertexId, executionId, 
userCodeClassLoader, actorAskTimeout);
 
-                       Environment env = new RuntimeEnvironment(jobId, 
vertexId, executionId, taskInfo,
-                                       jobConfiguration, taskConfiguration,
+                       Environment env = new RuntimeEnvironment(jobId, 
vertexId, executionId,
+                                       executionConfig, taskInfo, 
jobConfiguration, taskConfiguration,
                                        userCodeClassLoader, memoryManager, 
ioManager,
                                        broadcastVariableManager, 
accumulatorRegistry,
                                        splitProvider, distributedCacheEntries,

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e94a40c..95305f3 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -945,7 +945,7 @@ class JobManager(
           throw new JobSubmissionException(jobId, "The given job is empty")
         }
 
-        val restartStrategy = 
Option(jobGraph.getRestartStrategyConfiguration())
+        val restartStrategy = 
Option(jobGraph.getExecutionConfig.getRestartStrategy())
           .map(RestartStrategyFactory.createRestartStrategy(_)) match {
             case Some(strategy) => strategy
             case None => defaultRestartStrategy
@@ -964,6 +964,7 @@ class JobManager(
               jobGraph.getJobID,
               jobGraph.getName,
               jobGraph.getJobConfiguration,
+              jobGraph.getExecutionConfig,
               timeout,
               restartStrategy,
               jobGraph.getUserJarBlobKeys,

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 1c666e5..03ff83d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -60,7 +61,7 @@ public class CoordinatorShutdownTest {
                        vertex.setInvokableClass(Tasks.NoOpInvokable.class);
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
                        
-                       JobGraph testGraph = new JobGraph("test job", vertex);
+                       JobGraph testGraph = new JobGraph("test job", new 
ExecutionConfig(), vertex);
                        testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 
                                        5000, 60000, 0L, Integer.MAX_VALUE));
                        
@@ -112,7 +113,7 @@ public class CoordinatorShutdownTest {
                        vertex.setInvokableClass(Tasks.NoOpInvokable.class);
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
 
-                       JobGraph testGraph = new JobGraph("test job", vertex);
+                       JobGraph testGraph = new JobGraph("test job", new 
ExecutionConfig(), vertex);
                        testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
                                        5000, 60000, 0L, Integer.MAX_VALUE));
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 5d83bf2..c788bef 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import akka.actor.ActorSystem;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -48,6 +49,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                        new JobID(),
                        "test",
                        new Configuration(),
+                       new ExecutionConfig(),
                        new FiniteDuration(1, TimeUnit.DAYS),
                        new NoRestartStrategy(),
                        Collections.<BlobKey>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
index aeb521c..865760e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.client;
 
 import akka.actor.PoisonPill;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -94,7 +95,7 @@ public class JobClientActorRecoveryITCase extends TestLogger {
                JobVertex blockingVertex = new JobVertex("Blocking Vertex");
                blockingVertex.setInvokableClass(BlockingTask.class);
                blockingVertex.setParallelism(1);
-               final JobGraph jobGraph = new JobGraph("Blocking Test Job", 
blockingVertex);
+               final JobGraph jobGraph = new JobGraph("Blocking Test Job", new 
ExecutionConfig(), blockingVertex);
                final Promise<JobExecutionResult> promise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
 
                Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
index 00ad632..ee1fd60 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -25,6 +25,7 @@ import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
@@ -47,7 +48,7 @@ import java.util.concurrent.TimeUnit;
 public class JobClientActorTest extends TestLogger {
 
        private static ActorSystem system;
-       private static JobGraph testJobGraph = new JobGraph("Test Job");
+       private static JobGraph testJobGraph = new JobGraph("Test Job", new 
ExecutionConfig());
 
        @BeforeClass
        public static void setup() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 92b642a..e839c97 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -26,6 +26,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -54,10 +55,12 @@ public class TaskDeploymentDescriptorTest {
                        final List<InputGateDeploymentDescriptor> inputGates = 
new ArrayList<InputGateDeploymentDescriptor>(0);
                        final List<BlobKey> requiredJars = new 
ArrayList<BlobKey>(0);
                        final List<URL> requiredClasspaths = new 
ArrayList<URL>(0);
-       
-                       final TaskDeploymentDescriptor orig = new 
TaskDeploymentDescriptor(jobID, vertexID, execId, taskName,
-                               indexInSubtaskGroup, currentNumberOfSubtasks, 
attemptNumber, jobConfiguration, taskConfiguration,
-                               invokableClass.getName(), producedResults, 
inputGates, requiredJars, requiredClasspaths, 47);
+                       final ExecutionConfig executionConfig = new 
ExecutionConfig();
+
+                       final TaskDeploymentDescriptor orig = new 
TaskDeploymentDescriptor(jobID, vertexID, execId,
+                               executionConfig, taskName, indexInSubtaskGroup, 
currentNumberOfSubtasks, attemptNumber,
+                               jobConfiguration, taskConfiguration, 
invokableClass.getName(), producedResults, inputGates,
+                               requiredJars, requiredClasspaths, 47);
        
                        final TaskDeploymentDescriptor copy = 
CommonTestUtils.createCopySerializable(orig);
        
@@ -75,6 +78,7 @@ public class TaskDeploymentDescriptorTest {
                        assertEquals(orig.getAttemptNumber(), 
copy.getAttemptNumber());
                        assertEquals(orig.getProducedPartitions(), 
copy.getProducedPartitions());
                        assertEquals(orig.getInputGates(), 
copy.getInputGates());
+                       assertEquals(orig.getExecutionConfig(), 
copy.getExecutionConfig());
 
                        assertEquals(orig.getRequiredJarFiles(), 
copy.getRequiredJarFiles());
                        assertEquals(orig.getRequiredClasspaths(), 
copy.getRequiredClasspaths());

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index ee372dd..d845d01 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -106,6 +107,7 @@ public class ExecutionGraphConstructionTest {
                        jobId, 
                        jobName, 
                        cfg,
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -150,6 +152,7 @@ public class ExecutionGraphConstructionTest {
                        jobId, 
                        jobName, 
                        cfg, 
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -217,6 +220,7 @@ public class ExecutionGraphConstructionTest {
                        jobId, 
                        jobName, 
                        cfg, 
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -471,6 +475,7 @@ public class ExecutionGraphConstructionTest {
                        jobId, 
                        jobName, 
                        cfg, 
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -527,6 +532,7 @@ public class ExecutionGraphConstructionTest {
                        jobId, 
                        jobName, 
                        cfg, 
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -588,6 +594,7 @@ public class ExecutionGraphConstructionTest {
                                jobId, 
                                jobName, 
                                cfg, 
+                               new ExecutionConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
                        try {
@@ -633,6 +640,7 @@ public class ExecutionGraphConstructionTest {
                                jobId, 
                                jobName,
                                cfg, 
+                               new ExecutionConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
 
@@ -696,14 +704,15 @@ public class ExecutionGraphConstructionTest {
                        // isolated vertex
                        JobVertex v8 = new JobVertex("vertex8");
                        v8.setParallelism(2);
-                       
-                       JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, 
v4, v5, v6, v7, v8);
+
+                       JobGraph jg = new JobGraph(jobId, jobName, new 
ExecutionConfig(), v1, v2, v3, v4, v5, v6, v7, v8);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                TestingUtils.defaultExecutionContext(), 
                                jobId, 
                                jobName, 
                                cfg, 
+                               new ExecutionConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 6362732..7a9cee7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -30,6 +30,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -84,6 +85,7 @@ public class ExecutionGraphDeploymentTest {
                                jobId, 
                                "some job", 
                                new Configuration(), 
+                               new ExecutionConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
 
@@ -287,6 +289,7 @@ public class ExecutionGraphDeploymentTest {
                        jobId, 
                        "some job", 
                        new Configuration(),
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 646e195..0837927 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
@@ -72,13 +73,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
                sender.setInvokableClass(Tasks.NoOpInvokable.class);
                sender.setParallelism(NUM_TASKS);
 
-               JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+               JobGraph jobGraph = new JobGraph("Pointwise job", new 
ExecutionConfig(), sender);
 
                ExecutionGraph eg = new ExecutionGraph(
                                TestingUtils.defaultExecutionContext(),
                                new JobID(),
                                "test job",
                                new Configuration(),
+                               new ExecutionConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -127,14 +129,15 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                groupVertex.setStrictlyCoLocatedWith(groupVertex2);
                
                //initiate and schedule job
-               JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, 
groupVertex2);
+               JobGraph jobGraph = new JobGraph("Pointwise job", new 
ExecutionConfig(), groupVertex, groupVertex2);
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
                        new JobID(),
                        "test job",
                        new Configuration(),
-                               AkkaUtils.getDefaultTimeout(),
-                               new FixedDelayRestartStrategy(1, 0L));
+                       new ExecutionConfig(),
+                       AkkaUtils.getDefaultTimeout(),
+                       new FixedDelayRestartStrategy(1, 0L));
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
                assertEquals(JobStatus.CREATED, eg.getState());
@@ -181,13 +184,14 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                sender.setInvokableClass(Tasks.NoOpInvokable.class);
                sender.setParallelism(NUM_TASKS);
 
-               JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+               JobGraph jobGraph = new JobGraph("Pointwise job", new 
ExecutionConfig(), sender);
 
                ExecutionGraph eg = new ExecutionGraph(
                                TestingUtils.defaultExecutionContext(),
                                new JobID(),
                                "Test job",
                                new Configuration(),
+                               new ExecutionConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new FixedDelayRestartStrategy(1, 1000));
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -216,6 +220,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                                new JobID(),
                                "TestJob",
                                new Configuration(),
+                               new ExecutionConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                // We want to manually control the restart and 
delay
                                new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
@@ -224,7 +229,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
                jobVertex.setParallelism(NUM_TASKS);
 
-               JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
+               JobGraph jobGraph = new JobGraph("TestJob", new 
ExecutionConfig(), jobVertex);
 
                
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
@@ -274,6 +279,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                                new JobID(),
                                "TestJob",
                                new Configuration(),
+                               new ExecutionConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                // We want to manually control the restart and 
delay
                                new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
@@ -289,7 +295,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
                jobVertex.setParallelism(NUM_TASKS);
 
-               JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
+               JobGraph jobGraph = new JobGraph("TestJob", new 
ExecutionConfig(), jobVertex);
 
                
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
@@ -349,13 +355,14 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                sender.setInvokableClass(Tasks.NoOpInvokable.class);
                sender.setParallelism(NUM_TASKS);
 
-               JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+               JobGraph jobGraph = new JobGraph("Pointwise job", new 
ExecutionConfig(), sender);
 
                ExecutionGraph eg = spy(new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
                        new JobID(),
                        "Test job",
                        new Configuration(),
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new FixedDelayRestartStrategy(1, 1000)));
 
@@ -419,13 +426,14 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                receiver.setInvokableClass(Tasks.NoOpInvokable.class);
                receiver.setParallelism(1);
 
-               JobGraph jobGraph = new JobGraph("Pointwise job", sender, 
receiver);
+               JobGraph jobGraph = new JobGraph("Pointwise job", new 
ExecutionConfig(), sender, receiver);
 
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
                        new JobID(),
                        "test job",
                        new Configuration(),
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new FixedDelayRestartStrategy(1, 1000));
 
@@ -510,8 +518,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
                vertex.setInvokableClass(Tasks.NoOpInvokable.class);
                vertex.setParallelism(1);
 
-               JobGraph jobGraph = new JobGraph("Test Job", vertex);
-               
jobGraph.setRestartStrategyConfiguration(RestartStrategies.fixedDelayRestart(
+               JobGraph jobGraph = new JobGraph("Test Job", new 
ExecutionConfig(), vertex);
+               
jobGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(
                                Integer.MAX_VALUE, Integer.MAX_VALUE));
 
                ExecutionGraph eg = new ExecutionGraph(
@@ -519,6 +527,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                                new JobID(),
                                "test job",
                                new Configuration(),
+                               new ExecutionConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new FixedDelayRestartStrategy(1, 1000000));
 
@@ -561,8 +570,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
                vertex.setInvokableClass(Tasks.NoOpInvokable.class);
                vertex.setParallelism(1);
 
-               JobGraph jobGraph = new JobGraph("Test Job", vertex);
-               
jobGraph.setRestartStrategyConfiguration(RestartStrategies.fixedDelayRestart(
+               JobGraph jobGraph = new JobGraph("Test Job", new 
ExecutionConfig(), vertex);
+               
jobGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(
                                Integer.MAX_VALUE, Integer.MAX_VALUE));
 
                ExecutionGraph eg = new ExecutionGraph(
@@ -570,6 +579,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                                new JobID(),
                                "test job",
                                new Configuration(),
+                               new ExecutionConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new FixedDelayRestartStrategy(1, 1000000));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index 7cc91c0..d1bb680 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StoppingException;
@@ -127,6 +128,7 @@ public class ExecutionGraphSignalsTest {
                        jobId,
                        jobName,
                        cfg,
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                eg.attachJobGraph(ordered);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index ca07fbf..c85ca13 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -174,6 +175,7 @@ public class ExecutionGraphTestUtils {
                        new JobID(), 
                        "test job", 
                        new Configuration(), 
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 5dd5ba6..1ff90e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
 
 import java.util.Arrays;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -53,6 +54,7 @@ public class ExecutionStateProgressTest {
                                jid, 
                                "test job", 
                                new Configuration(), 
+                new ExecutionConfig(),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
                        graph.attachJobGraph(Arrays.asList(ajv));

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index a2e2482..2d7d6f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.StrictlyLocalAssignment;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -264,13 +265,14 @@ public class LocalInputSplitsTest {
                        vertex.setInvokableClass(DummyInvokable.class);
                        vertex.setInputSplitSource(new 
TestInputSplitSource(splits));
                        
-                       JobGraph jobGraph = new JobGraph("test job", vertex);
+                       JobGraph jobGraph = new JobGraph("test job", new 
ExecutionConfig(), vertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                TestingUtils.defaultExecutionContext(), 
                                jobGraph.getJobID(),
                                jobGraph.getName(),  
                                jobGraph.getJobConfiguration(),
+                               new ExecutionConfig(),
                                TIMEOUT,
                                new NoRestartStrategy());
                        
@@ -328,13 +330,14 @@ public class LocalInputSplitsTest {
                vertex.setInvokableClass(DummyInvokable.class);
                vertex.setInputSplitSource(new TestInputSplitSource(splits));
                
-               JobGraph jobGraph = new JobGraph("test job", vertex);
+               JobGraph jobGraph = new JobGraph("test job", new 
ExecutionConfig(), vertex);
                
                ExecutionGraph eg = new ExecutionGraph(
                        TestingUtils.defaultExecutionContext(),
                        jobGraph.getJobID(),
                        jobGraph.getName(),  
                        jobGraph.getJobConfiguration(),
+                       new ExecutionConfig(),
                        TIMEOUT,
                        new NoRestartStrategy());
                

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index c1afe04..cbeeded 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -63,6 +64,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg,
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -104,6 +106,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg, 
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -146,6 +149,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg, 
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -189,6 +193,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName,
                        cfg, 
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -230,6 +235,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg, 
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -291,6 +297,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg, 
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -343,6 +350,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg, 
+                       new ExecutionConfig(),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 2f0d5e7..f747ff3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -185,6 +186,7 @@ public class TerminalStateDeadlockTest {
                                jobId,
                                "test graph",
                                EMPTY_CONFIG,
+                               new ExecutionConfig(),
                                TIMEOUT,
                                new FixedDelayRestartStrategy(1, 0));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index 3670358..25c9d70 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
@@ -74,13 +75,14 @@ public class VertexLocationConstraintTest {
                        JobVertex jobVertex = new JobVertex("test vertex", new 
JobVertexID());
                        jobVertex.setInvokableClass(DummyInvokable.class);
                        jobVertex.setParallelism(2);
-                       JobGraph jg = new JobGraph("test job", jobVertex);
+                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                        TestingUtils.defaultExecutionContext(),
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
+                                       new ExecutionConfig(),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
@@ -146,13 +148,14 @@ public class VertexLocationConstraintTest {
                        JobVertex jobVertex = new JobVertex("test vertex", new 
JobVertexID());
                        jobVertex.setInvokableClass(DummyInvokable.class);
                        jobVertex.setParallelism(2);
-                       JobGraph jg = new JobGraph("test job", jobVertex);
+                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                        TestingUtils.defaultExecutionContext(),
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
+                                       new ExecutionConfig(),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
@@ -222,13 +225,14 @@ public class VertexLocationConstraintTest {
                        jobVertex1.setSlotSharingGroup(sharingGroup);
                        jobVertex2.setSlotSharingGroup(sharingGroup);
                        
-                       JobGraph jg = new JobGraph("test job", jobVertex1, 
jobVertex2);
+                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex1, jobVertex2);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                        TestingUtils.defaultExecutionContext(),
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
+                                       new ExecutionConfig(),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
@@ -289,13 +293,14 @@ public class VertexLocationConstraintTest {
                        JobVertex jobVertex = new JobVertex("test vertex", new 
JobVertexID());
                        jobVertex.setInvokableClass(DummyInvokable.class);
                        jobVertex.setParallelism(1);
-                       JobGraph jg = new JobGraph("test job", jobVertex);
+                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                        TestingUtils.defaultExecutionContext(),
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
+                                       new ExecutionConfig(),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
@@ -354,7 +359,7 @@ public class VertexLocationConstraintTest {
                        jobVertex1.setParallelism(1);
                        jobVertex2.setParallelism(1);
                        
-                       JobGraph jg = new JobGraph("test job", jobVertex1, 
jobVertex2);
+                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), jobVertex1, jobVertex2);
                        
                        SlotSharingGroup sharingGroup = new SlotSharingGroup();
                        jobVertex1.setSlotSharingGroup(sharingGroup);
@@ -365,6 +370,7 @@ public class VertexLocationConstraintTest {
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
+                                       new ExecutionConfig(),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
@@ -397,13 +403,14 @@ public class VertexLocationConstraintTest {
        public void testArchivingClearsFields() {
                try {
                        JobVertex vertex = new JobVertex("test vertex", new 
JobVertexID());
-                       JobGraph jg = new JobGraph("test job", vertex);
+                       JobGraph jg = new JobGraph("test job", new 
ExecutionConfig(), vertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
                                        TestingUtils.defaultExecutionContext(),
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
+                                       new ExecutionConfig(),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Collections.singletonList(vertex));

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 5c7297e..5110249 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -75,6 +76,7 @@ public class VertexSlotSharingTest {
                                        new JobID(),
                                        "test job",
                                        new Configuration(),
+                                       new ExecutionConfig(),
                                        AkkaUtils.getDefaultTimeout(),
                                        new NoRestartStrategy());
                        eg.attachJobGraph(vertices);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 854be5f..317eed7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.reader.BufferReader;
@@ -89,7 +90,7 @@ public class PartialConsumePipelinedResultTest {
                                sender, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
 
                final JobGraph jobGraph = new JobGraph(
-                               "Partial Consume of Pipelined Result", sender, 
receiver);
+                               "Partial Consume of Pipelined Result", new 
ExecutionConfig(), sender, receiver);
 
                final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
                                sender.getID(), receiver.getID());

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index ca047e8..68b05b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
@@ -31,7 +32,7 @@ public class JobGraphTest {
        @Test
        public void testSerialization() {
                try {
-                       JobGraph jg = new JobGraph("The graph");
+                       JobGraph jg = new JobGraph("The graph", new 
ExecutionConfig());
                        
                        // add some configuration values
                        {
@@ -89,8 +90,9 @@ public class JobGraphTest {
                        target2.connectNewDataSetAsInput(intermediate2, 
DistributionPattern.POINTWISE);
                        intermediate2.connectNewDataSetAsInput(intermediate1, 
DistributionPattern.POINTWISE);
                        intermediate1.connectNewDataSetAsInput(source2, 
DistributionPattern.POINTWISE);
-                       
-                       JobGraph graph = new JobGraph("TestGraph", source1, 
source2, intermediate1, intermediate2, target1, target2);
+
+                       JobGraph graph = new JobGraph("TestGraph", new 
ExecutionConfig(),
+                               source1, source2, intermediate1, intermediate2, 
target1, target2);
                        List<JobVertex> sorted = 
graph.getVerticesSortedTopologicallyFromSources();
                        
                        assertEquals(6, sorted.size());
@@ -133,8 +135,9 @@ public class JobGraphTest {
                        l12.connectNewDataSetAsInput(source2, 
DistributionPattern.POINTWISE);
                        
                        l13.connectNewDataSetAsInput(source2, 
DistributionPattern.POINTWISE);
-                       
-                       JobGraph graph = new JobGraph("TestGraph", source1, 
source2, root, l11, l13, l12, l2);
+
+                       JobGraph graph = new JobGraph("TestGraph", new 
ExecutionConfig(),
+                               source1, source2, root, l11, l13, l12, l2);
                        List<JobVertex> sorted = 
graph.getVerticesSortedTopologicallyFromSources();
                        
                        assertEquals(7,  sorted.size());
@@ -179,8 +182,8 @@ public class JobGraphTest {
                        op2.connectNewDataSetAsInput(op1, 
DistributionPattern.POINTWISE);
                        op2.connectNewDataSetAsInput(source, 
DistributionPattern.POINTWISE);
                        op3.connectNewDataSetAsInput(op2, 
DistributionPattern.POINTWISE);
-                       
-                       JobGraph graph = new JobGraph("TestGraph", source, op1, 
op2, op3);
+
+                       JobGraph graph = new JobGraph("TestGraph", new 
ExecutionConfig(), source, op1, op2, op3);
                        List<JobVertex> sorted = 
graph.getVerticesSortedTopologicallyFromSources();
                        
                        assertEquals(4,  sorted.size());
@@ -208,8 +211,8 @@ public class JobGraphTest {
                        v2.connectNewDataSetAsInput(v1, 
DistributionPattern.POINTWISE);
                        v3.connectNewDataSetAsInput(v2, 
DistributionPattern.POINTWISE);
                        v4.connectNewDataSetAsInput(v3, 
DistributionPattern.POINTWISE);
-                       
-                       JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, 
v4);
+
+                       JobGraph jg = new JobGraph("Cyclic Graph", new 
ExecutionConfig(), v1, v2, v3, v4);
                        try {
                                jg.getVerticesSortedTopologicallyFromSources();
                                fail("Failed to raise error on topologically 
sorting cyclic graph.");
@@ -240,8 +243,8 @@ public class JobGraphTest {
                        v3.connectNewDataSetAsInput(v2, 
DistributionPattern.POINTWISE);
                        v4.connectNewDataSetAsInput(v3, 
DistributionPattern.POINTWISE);
                        target.connectNewDataSetAsInput(v3, 
DistributionPattern.POINTWISE);
-                       
-                       JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, 
v4, source, target);
+
+                       JobGraph jg = new JobGraph("Cyclic Graph", new 
ExecutionConfig(), v1, v2, v3, v4, source, target);
                        try {
                                jg.getVerticesSortedTopologicallyFromSources();
                                fail("Failed to raise error on topologically 
sorting cyclic graph.");

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
index 46fb694..612f64f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph.jsonplan;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -65,8 +66,8 @@ public class JsonGeneratorTest {
                        
                        sink1.connectNewDataSetAsInput(join2, 
DistributionPattern.POINTWISE);
                        sink2.connectNewDataSetAsInput(join1, 
DistributionPattern.ALL_TO_ALL);
-                       
-                       JobGraph jg = new JobGraph("my job", source1, source2, 
source3,
+
+                       JobGraph jg = new JobGraph("my job", new 
ExecutionConfig(), source1, source2, source3,
                                        intermediate1, intermediate2, join1, 
join2, sink1, sink2);
                        
                        String plan = JsonPlanGenerator.generatePlan(jg);

http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 27c32a0..d283e91 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -23,6 +23,7 @@ import akka.testkit.JavaTestKit;
 
 import com.typesafe.config.Config;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -122,7 +123,7 @@ public class JobManagerTest {
                                
sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
                                sender.createAndAddResultDataSet(rid, 
PIPELINED);
 
-                               final JobGraph jobGraph = new 
JobGraph("Blocking test job", sender);
+                               final JobGraph jobGraph = new 
JobGraph("Blocking test job", new ExecutionConfig(), sender);
                                final JobID jid = jobGraph.getJobID();
 
                                final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(
@@ -246,7 +247,7 @@ public class JobManagerTest {
                                sender.setParallelism(2);
                                
sender.setInvokableClass(StoppableInvokable.class);
 
-                               final JobGraph jobGraph = new 
JobGraph("Stoppable streaming test job", sender);
+                               final JobGraph jobGraph = new 
JobGraph("Stoppable streaming test job", new ExecutionConfig(), sender);
                                final JobID jid = jobGraph.getJobID();
 
                                final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
@@ -294,7 +295,7 @@ public class JobManagerTest {
                                sender.setParallelism(1);
                                
sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
 
-                               final JobGraph jobGraph = new 
JobGraph("Non-Stoppable batching test job", sender);
+                               final JobGraph jobGraph = new 
JobGraph("Non-Stoppable batching test job", new ExecutionConfig(), sender);
                                final JobID jid = jobGraph.getJobID();
 
                                final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());

Reply via email to