[FLINK-3327] ExecutionConfig to JobGraph This closes #1583
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f8d76c6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f8d76c6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f8d76c6 Branch: refs/heads/master Commit: 0f8d76c6f71e0b4a07ed11a112c3bd21c300e966 Parents: d0a390f Author: kl0u <kklou...@gmail.com> Authored: Fri Mar 11 22:31:12 2016 +0100 Committer: zentol <ches...@apache.org> Committed: Sat Mar 12 00:07:49 2016 +0100 ---------------------------------------------------------------------- .../flink/api/common/ExecutionConfig.java | 6 +- .../java/org/apache/flink/api/common/Plan.java | 11 --- .../plantranslate/JobGraphGenerator.java | 16 +--- .../BackPressureStatsTrackerITCase.java | 3 +- .../StackTraceSampleCoordinatorITCase.java | 3 +- .../deployment/TaskDeploymentDescriptor.java | 30 ++++--- .../flink/runtime/execution/Environment.java | 8 ++ .../runtime/executiongraph/ExecutionGraph.java | 18 ++--- .../runtime/executiongraph/ExecutionVertex.java | 4 +- .../apache/flink/runtime/jobgraph/JobGraph.java | 82 ++++++++++---------- .../jobgraph/tasks/AbstractInvokable.java | 27 +------ .../flink/runtime/operators/DataSinkTask.java | 20 +---- .../flink/runtime/operators/DataSourceTask.java | 18 +---- .../runtime/taskmanager/RuntimeEnvironment.java | 11 ++- .../apache/flink/runtime/taskmanager/Task.java | 9 ++- .../flink/runtime/jobmanager/JobManager.scala | 3 +- .../checkpoint/CoordinatorShutdownTest.java | 5 +- ...ExecutionGraphCheckpointCoordinatorTest.java | 2 + .../client/JobClientActorRecoveryITCase.java | 3 +- .../runtime/client/JobClientActorTest.java | 3 +- .../TaskDeploymentDescriptorTest.java | 12 ++- .../ExecutionGraphConstructionTest.java | 13 +++- .../ExecutionGraphDeploymentTest.java | 3 + .../ExecutionGraphRestartTest.java | 36 +++++---- .../ExecutionGraphSignalsTest.java | 2 + .../executiongraph/ExecutionGraphTestUtils.java | 2 + .../ExecutionStateProgressTest.java | 2 + .../executiongraph/LocalInputSplitsTest.java | 7 +- .../executiongraph/PointwisePatternTest.java | 8 ++ .../TerminalStateDeadlockTest.java | 2 + .../VertexLocationConstraintTest.java | 19 +++-- .../executiongraph/VertexSlotSharingTest.java | 2 + .../PartialConsumePipelinedResultTest.java | 3 +- .../flink/runtime/jobgraph/JobGraphTest.java | 25 +++--- .../jobgraph/jsonplan/JsonGeneratorTest.java | 5 +- .../runtime/jobmanager/JobManagerTest.java | 7 +- .../flink/runtime/jobmanager/JobSubmitTest.java | 5 +- .../SlotCountExceedingParallelismTest.java | 3 +- .../StandaloneSubmittedJobGraphStoreTest.java | 3 +- .../ZooKeeperSubmittedJobGraphsStoreITCase.java | 3 +- .../ScheduleOrUpdateConsumersTest.java | 2 + .../LeaderChangeStateCleanupTest.java | 3 +- .../operators/testutils/DummyEnvironment.java | 7 ++ .../operators/testutils/MockEnvironment.java | 9 +++ .../runtime/taskmanager/TaskAsyncCallTest.java | 3 +- .../runtime/taskmanager/TaskCancelTest.java | 3 +- .../runtime/taskmanager/TaskManagerTest.java | 45 +++++++---- .../flink/runtime/taskmanager/TaskStopTest.java | 2 + .../flink/runtime/taskmanager/TaskTest.java | 3 +- .../TaskManagerLossFailsTasksTest.scala | 5 +- .../jobmanager/CoLocationConstraintITCase.scala | 3 +- .../runtime/jobmanager/JobManagerITCase.scala | 48 ++++++------ .../runtime/jobmanager/RecoveryITCase.scala | 16 +++- .../runtime/jobmanager/SlotSharingITCase.scala | 6 +- .../TaskManagerFailsWithSlotSharingITCase.scala | 6 +- .../api/graph/StreamingJobGraphGenerator.java | 14 +--- .../streaming/api/RestartStrategyTest.java | 6 +- .../graph/StreamingJobGraphGeneratorTest.java | 10 ++- .../partitioner/RescalePartitionerTest.java | 2 + .../runtime/tasks/StreamMockEnvironment.java | 18 ++++- .../tasks/StreamTaskAsyncCheckpointTest.java | 6 ++ .../streaming/runtime/tasks/StreamTaskTest.java | 3 +- .../runtime/tasks/StreamTaskTestHarness.java | 10 +-- .../JobSubmissionFailsITCase.java | 7 +- .../flink/test/javaApiOperators/MapITCase.java | 33 ++++++++ .../JobManagerHACheckpointRecoveryITCase.java | 3 +- .../JobManagerHAJobGraphRecoveryITCase.java | 3 +- .../runtime/NetworkStackThroughputITCase.java | 3 +- .../ZooKeeperLeaderElectionITCase.java | 3 +- .../flink/test/web/WebFrontendITCase.java | 5 +- .../jobmanager/JobManagerFailsITCase.scala | 5 +- .../JobManagerLeaderSessionIDITSuite.scala | 3 +- .../taskmanager/TaskManagerFailsITCase.scala | 11 ++- 73 files changed, 442 insertions(+), 308 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 9642fa9..9b0ff52 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -59,9 +59,6 @@ public class ExecutionConfig implements Serializable { private static final long serialVersionUID = 1L; - // Key for storing it in the Job Configuration - public static final String CONFIG_KEY = "runtime.config"; - /** * The constant to use for the parallelism, if the system should use the number * of currently available slots. @@ -648,7 +645,8 @@ public class ExecutionConfig implements Serializable { Objects.equals(executionMode, other.executionMode) && useClosureCleaner == other.useClosureCleaner && parallelism == other.parallelism && - restartStrategyConfiguration.equals(other.restartStrategyConfiguration) && + ((restartStrategyConfiguration == null && other.restartStrategyConfiguration == null) || + restartStrategyConfiguration.equals(other.restartStrategyConfiguration)) && forceKryo == other.forceKryo && objectReuse == other.objectReuse && autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled && http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-core/src/main/java/org/apache/flink/api/common/Plan.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java index d81fcd1..3e5cb61 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java @@ -38,7 +38,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.api.common.operators.GenericDataSinkBase; import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.util.Visitable; @@ -295,16 +294,6 @@ public class Plan implements Visitable<Operator<?>> { } /** - * Returns the specified restart strategy configuration. This configuration defines the used - * restart strategy to be used at runtime. - * - * @return The specified restart strategy configuration - */ - public RestartStrategies.RestartStrategyConfiguration getRestartStrategyConfiguration() { - return getExecutionConfig().getRestartStrategy(); - } - - /** * Gets the optimizer post-pass class for this job. The post-pass typically creates utility classes * for data types and is specific to a particular data model (record, tuple, Scala, ...) * http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index f59c347..c7aaa7d 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -20,7 +20,6 @@ package org.apache.flink.optimizer.plantranslate; import com.fasterxml.jackson.core.JsonFactory; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.aggregators.AggregatorRegistry; import org.apache.flink.api.common.aggregators.AggregatorWithName; @@ -81,11 +80,9 @@ import org.apache.flink.runtime.operators.chaining.ChainedDriver; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.StringUtils; import org.apache.flink.util.Visitor; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -217,9 +214,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { // ----------- finalize the job graph ----------- // create the job graph object - JobGraph graph = new JobGraph(jobId, program.getJobName()); - - graph.setRestartStrategyConfiguration(program.getOriginalPlan().getRestartStrategyConfiguration()); + JobGraph graph = new JobGraph(jobId, program.getJobName(), program.getOriginalPlan().getExecutionConfig()); graph.setAllowQueuedScheduling(false); graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout()); @@ -238,15 +233,6 @@ public class JobGraphGenerator implements Visitor<PlanNode> { DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), graph.getJobConfiguration()); } - try { - InstantiationUtil.writeObjectToConfig( - program.getOriginalPlan().getExecutionConfig(), - graph.getJobConfiguration(), - ExecutionConfig.CONFIG_KEY); - } catch (IOException e) { - throw new RuntimeException("Config object could not be written to Job Configuration: " + e); - } - // release all references again this.vertices = null; this.chainedTasks = null; http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java index 2659185..1f0b2ef 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemoryType; @@ -92,7 +93,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger { final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS); // The JobGraph - final JobGraph jobGraph = new JobGraph(); + final JobGraph jobGraph = new JobGraph(new ExecutionConfig()); final int parallelism = 4; final JobVertex task = new JobVertex("Task"); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java index 63c3712..c6ce315 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -76,7 +77,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger { final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS); // The JobGraph - final JobGraph jobGraph = new JobGraph(); + final JobGraph jobGraph = new JobGraph(new ExecutionConfig()); final int parallelism = 1; final JobVertex task = new JobVertex("Task"); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index 983ad38..60b8ba6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.deployment; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; @@ -88,6 +89,9 @@ public final class TaskDeploymentDescriptor implements Serializable { private final SerializedValue<StateHandle<?>> operatorState; + /** The execution configuration (see {@link ExecutionConfig}) related to the specific job. */ + private final ExecutionConfig executionConfig; + private long recoveryTimestamp; /** @@ -95,9 +99,9 @@ public final class TaskDeploymentDescriptor implements Serializable { */ public TaskDeploymentDescriptor( JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId, - String taskName, int indexInSubtaskGroup, int numberOfSubtasks, int attemptNumber, - Configuration jobConfiguration, Configuration taskConfiguration, String invokableClassName, - List<ResultPartitionDeploymentDescriptor> producedPartitions, + ExecutionConfig executionConfig, String taskName, int indexInSubtaskGroup, int numberOfSubtasks, + int attemptNumber, Configuration jobConfiguration, Configuration taskConfiguration, + String invokableClassName, List<ResultPartitionDeploymentDescriptor> producedPartitions, List<InputGateDeploymentDescriptor> inputGates, List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths, int targetSlotNumber, SerializedValue<StateHandle<?>> operatorState, @@ -111,6 +115,7 @@ public final class TaskDeploymentDescriptor implements Serializable { this.jobID = checkNotNull(jobID); this.vertexID = checkNotNull(vertexID); this.executionId = checkNotNull(executionId); + this.executionConfig = checkNotNull(executionConfig); this.taskName = checkNotNull(taskName); this.indexInSubtaskGroup = indexInSubtaskGroup; this.numberOfSubtasks = numberOfSubtasks; @@ -129,16 +134,23 @@ public final class TaskDeploymentDescriptor implements Serializable { public TaskDeploymentDescriptor( JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId, - String taskName, int indexInSubtaskGroup, int numberOfSubtasks, int attemptNumber, - Configuration jobConfiguration, Configuration taskConfiguration, String invokableClassName, - List<ResultPartitionDeploymentDescriptor> producedPartitions, + ExecutionConfig executionConfig, String taskName, int indexInSubtaskGroup, int numberOfSubtasks, + int attemptNumber, Configuration jobConfiguration, Configuration taskConfiguration, + String invokableClassName, List<ResultPartitionDeploymentDescriptor> producedPartitions, List<InputGateDeploymentDescriptor> inputGates, List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths, int targetSlotNumber) { - this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, attemptNumber, - jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, - inputGates, requiredJarFiles, requiredClasspaths, targetSlotNumber, null, -1); + this(jobID, vertexID, executionId, executionConfig, taskName, indexInSubtaskGroup, + numberOfSubtasks, attemptNumber, jobConfiguration, taskConfiguration, invokableClassName, + producedPartitions, inputGates, requiredJarFiles, requiredClasspaths, targetSlotNumber, null, -1); + } + + /** + * Returns the execution configuration (see {@link ExecutionConfig}) related to the specific job. + */ + public ExecutionConfig getExecutionConfig() { + return executionConfig; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 7332151..a10c463 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.execution; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -46,6 +47,13 @@ import java.util.concurrent.Future; public interface Environment { /** + * Returns the job specific {@link ExecutionConfig}. + * + * @return The execution configuration associated with the current job. + * */ + ExecutionConfig getExecutionConfig(); + + /** * Returns the ID of the job that the task belongs to. * * @return the ID of the job from the original job graph http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index c17cf15..026f6b2 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph; import akka.actor.ActorSystem; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; @@ -60,7 +61,6 @@ import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -180,6 +180,9 @@ public class ExecutionGraph implements Serializable { // ------ Configuration of the Execution ------- + /** The execution configuration (see {@link ExecutionConfig}) related to this specific job. */ + private ExecutionConfig executionConfig; + /** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able * to deploy them immediately. */ private boolean allowQueuedScheduling = false; @@ -234,7 +237,6 @@ public class ExecutionGraph implements Serializable { private ExecutionContext executionContext; // ------ Fields that are only relevant for archived execution graphs ------------ - private ExecutionConfig executionConfig; private String jsonPlan; @@ -250,6 +252,7 @@ public class ExecutionGraph implements Serializable { JobID jobId, String jobName, Configuration jobConfig, + ExecutionConfig config, FiniteDuration timeout, RestartStrategy restartStrategy) { this( @@ -257,6 +260,7 @@ public class ExecutionGraph implements Serializable { jobId, jobName, jobConfig, + config, timeout, restartStrategy, new ArrayList<BlobKey>(), @@ -270,6 +274,7 @@ public class ExecutionGraph implements Serializable { JobID jobId, String jobName, Configuration jobConfig, + ExecutionConfig config, FiniteDuration timeout, RestartStrategy restartStrategy, List<BlobKey> requiredJarFiles, @@ -302,7 +307,7 @@ public class ExecutionGraph implements Serializable { this.requiredJarFiles = requiredJarFiles; this.requiredClasspaths = requiredClasspaths; - + this.executionConfig = Preconditions.checkNotNull(config); this.timeout = timeout; this.restartStrategy = restartStrategy; @@ -942,12 +947,7 @@ public class ExecutionGraph implements Serializable { if (!state.isTerminalState()) { throw new IllegalStateException("Can only archive the job from a terminal state"); } - // "unpack" execution config before we throw away the usercode classloader. - try { - executionConfig = (ExecutionConfig) InstantiationUtil.readObjectFromConfig(jobConfiguration, ExecutionConfig.CONFIG_KEY,userClassLoader); - } catch (Exception e) { - LOG.warn("Error deserializing the execution config while archiving the execution graph", e); - } + // clear the non-serializable fields userClassLoader = null; scheduler = null; http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index f2d30b5..80430bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; @@ -665,10 +666,11 @@ public class ExecutionVertex implements Serializable { consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, queueToRequest, partitions)); } + ExecutionConfig config = getExecutionGraph().getExecutionConfig(); List<BlobKey> jarFiles = getExecutionGraph().getRequiredJarFiles(); List<URL> classpaths = getExecutionGraph().getRequiredClasspaths(); - return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(), + return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, config, getTaskName(), subTaskIndex, getTotalNumberOfParallelSubtasks(), attemptNumber, getExecutionGraph().getJobConfiguration(), jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(), producedPartitions, consumedPartitions, jarFiles, classpaths, targetSlot.getRoot().getSlotNumber(), http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index f99d754..ed714a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.jobgraph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; @@ -33,15 +35,14 @@ import java.io.Serializable; import java.net.InetSocketAddress; import java.net.URL; import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Collections; import java.util.Set; +import java.util.LinkedHashSet; +import java.util.Iterator; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; /** * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts. * All programs from higher level APIs are transformed into JobGraphs. @@ -79,9 +80,7 @@ public class JobGraph implements Serializable { /** Name of this job. */ private final String jobName; - /** Configuration which defines which restart strategy to use for the job recovery */ - private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration; - + private final ExecutionConfig executionConfig; /** The number of seconds after which the corresponding ExecutionGraph is removed at the * job manager after it has been executed. */ @@ -102,60 +101,74 @@ public class JobGraph implements Serializable { // -------------------------------------------------------------------------------------------- /** - * Constructs a new job graph with no name and a random job ID. + * Constructs a new job graph with no name, a random job ID, and the given + * {@link ExecutionConfig}. + * + * @param config The {@link ExecutionConfig} for the job. */ - public JobGraph() { - this((String) null); + public JobGraph(ExecutionConfig config) { + this((String) null, config); } /** - * Constructs a new job graph with the given name, a random job ID. + * Constructs a new job graph with the given name, the given {@link ExecutionConfig}, + * and a random job ID. * - * @param jobName The name of the job + * @param jobName The name of the job. + * @param config The execution configuration of the job. */ - public JobGraph(String jobName) { - this(null, jobName); + public JobGraph(String jobName, ExecutionConfig config) { + this(null, jobName, config); } /** - * Constructs a new job graph with the given name and a random job ID if null supplied as an id. + * Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed), + * the given name and the given execution configuration (see {@link ExecutionConfig}). * * @param jobId The id of the job. A random ID is generated, if {@code null} is passed. * @param jobName The name of the job. + * @param config The execution configuration of the job. */ - public JobGraph(JobID jobId, String jobName) { + public JobGraph(JobID jobId, String jobName, ExecutionConfig config) { this.jobID = jobId == null ? new JobID() : jobId; this.jobName = jobName == null ? "(unnamed job)" : jobName; + this.executionConfig = Preconditions.checkNotNull(config); } /** - * Constructs a new job graph with no name and a random job ID if null supplied as an id. + * Constructs a new job graph with no name, a random job ID, the given {@link ExecutionConfig}, and + * the given job vertices. * + * @param config The execution configuration of the job. * @param vertices The vertices to add to the graph. */ - public JobGraph(JobVertex... vertices) { - this(null, vertices); + public JobGraph(ExecutionConfig config, JobVertex... vertices) { + this(null, config, vertices); } /** - * Constructs a new job graph with the given name and a random job ID. + * Constructs a new job graph with the given name, the given {@link ExecutionConfig}, a random job ID, + * and the given job vertices. * * @param jobName The name of the job. + * @param config The execution configuration of the job. * @param vertices The vertices to add to the graph. */ - public JobGraph(String jobName, JobVertex... vertices) { - this(null, jobName, vertices); + public JobGraph(String jobName, ExecutionConfig config, JobVertex... vertices) { + this(null, jobName, config, vertices); } /** - * Constructs a new job graph with the given name and a random job ID if null supplied as an id. + * Constructs a new job graph with the given name, the given {@link ExecutionConfig}, + * the given jobId or a random one if null supplied, and the given job vertices. * * @param jobId The id of the job. A random ID is generated, if {@code null} is passed. * @param jobName The name of the job. + * @param config The execution configuration of the job. * @param vertices The vertices to add to the graph. */ - public JobGraph(JobID jobId, String jobName, JobVertex... vertices) { - this(jobId, jobName); + public JobGraph(JobID jobId, String jobName, ExecutionConfig config, JobVertex... vertices) { + this(jobId, jobName, config); for (JobVertex vertex : vertices) { addVertex(vertex); @@ -192,23 +205,8 @@ public class JobGraph implements Serializable { return this.jobConfiguration; } - /** - * Sets the restart strategy configuration. This configuration specifies the restart strategy - * to be used by the ExecutionGraph in case of a restart. - * - * @param restartStrategyConfiguration Restart strategy configuration to be set - */ - public void setRestartStrategyConfiguration(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) { - this.restartStrategyConfiguration = restartStrategyConfiguration; - } - - /** - * Gets the restart strategy configuration - * - * @return Restart strategy configuration to be used - */ - public RestartStrategies.RestartStrategyConfiguration getRestartStrategyConfiguration() { - return restartStrategyConfiguration; + public ExecutionConfig getExecutionConfig() { + return this.executionConfig; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index 30f32a1..d7dfaf5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.BatchTask; -import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,9 +41,6 @@ public abstract class AbstractInvokable { /** The environment assigned to this invokable. */ private Environment environment; - /** The execution config, cached from the deserialization from the JobConfiguration */ - private ExecutionConfig executionConfig; - /** * Starts the execution. * @@ -125,29 +121,10 @@ public abstract class AbstractInvokable { } /** - * Returns the global ExecutionConfig, obtained from the job configuration. + * Returns the global ExecutionConfig. */ public ExecutionConfig getExecutionConfig() { - if (executionConfig != null) { - return executionConfig; - } - - try { - executionConfig = (ExecutionConfig) InstantiationUtil.readObjectFromConfig( - getJobConfiguration(), - ExecutionConfig.CONFIG_KEY, - getUserCodeClassLoader()); - - if (executionConfig == null) { - LOG.warn("Environment did not contain an ExecutionConfig - using a default config."); - executionConfig = new ExecutionConfig(); - } - return executionConfig; - } - catch (Exception e) { - LOG.warn("Could not load ExecutionConfig from Environment, returning default ExecutionConfig", e); - return new ExecutionConfig(); - } + return this.environment.getExecutionConfig(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 6972e1d..21e8784 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -41,13 +41,10 @@ import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext; import org.apache.flink.runtime.operators.util.ReaderIterator; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.plugable.DeserializationDelegate; -import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.MutableObjectIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - /** * DataSinkTask which is executed by a task manager. The task hands the data to an output format. * @@ -112,21 +109,8 @@ public class DataSinkTask<IT> extends AbstractInvokable { LOG.debug(getLogString("Rich Sink detected. Initializing runtime context.")); } - ExecutionConfig executionConfig; - try { - ExecutionConfig c = InstantiationUtil.readObjectFromConfig( - getJobConfiguration(), - ExecutionConfig.CONFIG_KEY, - getUserCodeClassLoader()); - if (c != null) { - executionConfig = c; - } else { - LOG.warn("The execution config returned by the configuration was null"); - executionConfig = new ExecutionConfig(); - } - } catch (IOException | ClassNotFoundException e) { - throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e); - } + ExecutionConfig executionConfig = getExecutionConfig(); + boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled(); try { http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 93a4f9c..960faf7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -36,11 +36,9 @@ import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubExcepti import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; -import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -104,21 +102,7 @@ public class DataSourceTask<OT> extends AbstractInvokable { LOG.debug(getLogString("Rich Source detected. Initializing runtime context.")); } - ExecutionConfig executionConfig; - try { - ExecutionConfig c = (ExecutionConfig) InstantiationUtil.readObjectFromConfig( - getJobConfiguration(), - ExecutionConfig.CONFIG_KEY, - getUserCodeClassLoader()); - if (c != null) { - executionConfig = c; - } else { - LOG.warn("ExecutionConfig from job configuration is null. Creating empty config"); - executionConfig = new ExecutionConfig(); - } - } catch (IOException | ClassNotFoundException e) { - throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: ", e); - } + ExecutionConfig executionConfig = getExecutionConfig(); boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled(); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index 0fddde4..51e7e34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskmanager; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; @@ -55,7 +56,8 @@ public class RuntimeEnvironment implements Environment { private final Configuration jobConfiguration; private final Configuration taskConfiguration; - + private final ExecutionConfig executionConfig; + private final ClassLoader userCodeClassLoader; private final MemoryManager memManager; @@ -80,6 +82,7 @@ public class RuntimeEnvironment implements Environment { JobID jobId, JobVertexID jobVertexId, ExecutionAttemptID executionId, + ExecutionConfig executionConfig, TaskInfo taskInfo, Configuration jobConfiguration, Configuration taskConfiguration, @@ -99,6 +102,7 @@ public class RuntimeEnvironment implements Environment { this.jobVertexId = checkNotNull(jobVertexId); this.executionId = checkNotNull(executionId); this.taskInfo = checkNotNull(taskInfo); + this.executionConfig = checkNotNull(executionConfig); this.jobConfiguration = checkNotNull(jobConfiguration); this.taskConfiguration = checkNotNull(taskConfiguration); this.userCodeClassLoader = checkNotNull(userCodeClassLoader); @@ -117,6 +121,11 @@ public class RuntimeEnvironment implements Environment { // ------------------------------------------------------------------------ @Override + public ExecutionConfig getExecutionConfig() { + return this.executionConfig; + } + + @Override public JobID getJobID() { return jobId; } http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index f2d6025..d531e43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskmanager; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.cache.DistributedCache; @@ -217,6 +218,9 @@ public class Task implements Runnable { private volatile long recoveryTs; + /** The job specific execution configuration (see {@link ExecutionConfig}). */ + private final ExecutionConfig executionConfig; + /** * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to * be undone in the case of a failing task deployment.</p> @@ -245,6 +249,7 @@ public class Task implements Runnable { this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName()); this.operatorState = tdd.getOperatorState(); this.recoveryTs = tdd.getRecoveryTimestamp(); + this.executionConfig = checkNotNull(tdd.getExecutionConfig()); this.memoryManager = checkNotNull(memManager); this.ioManager = checkNotNull(ioManager); @@ -493,8 +498,8 @@ public class Task implements Runnable { TaskInputSplitProvider splitProvider = new TaskInputSplitProvider(jobManager, jobId, vertexId, executionId, userCodeClassLoader, actorAskTimeout); - Environment env = new RuntimeEnvironment(jobId, vertexId, executionId, taskInfo, - jobConfiguration, taskConfiguration, + Environment env = new RuntimeEnvironment(jobId, vertexId, executionId, + executionConfig, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, accumulatorRegistry, splitProvider, distributedCacheEntries, http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index e94a40c..95305f3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -945,7 +945,7 @@ class JobManager( throw new JobSubmissionException(jobId, "The given job is empty") } - val restartStrategy = Option(jobGraph.getRestartStrategyConfiguration()) + val restartStrategy = Option(jobGraph.getExecutionConfig.getRestartStrategy()) .map(RestartStrategyFactory.createRestartStrategy(_)) match { case Some(strategy) => strategy case None => defaultRestartStrategy @@ -964,6 +964,7 @@ class JobManager( jobGraph.getJobID, jobGraph.getName, jobGraph.getJobConfiguration, + jobGraph.getExecutionConfig, timeout, restartStrategy, jobGraph.getUserJarBlobKeys, http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index 1c666e5..03ff83d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.ListeningBehaviour; @@ -60,7 +61,7 @@ public class CoordinatorShutdownTest { vertex.setInvokableClass(Tasks.NoOpInvokable.class); List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID()); - JobGraph testGraph = new JobGraph("test job", vertex); + JobGraph testGraph = new JobGraph("test job", new ExecutionConfig(), vertex); testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000, 60000, 0L, Integer.MAX_VALUE)); @@ -112,7 +113,7 @@ public class CoordinatorShutdownTest { vertex.setInvokableClass(Tasks.NoOpInvokable.class); List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID()); - JobGraph testGraph = new JobGraph("test job", vertex); + JobGraph testGraph = new JobGraph("test job", new ExecutionConfig(), vertex); testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000, 60000, 0L, Integer.MAX_VALUE)); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index 5d83bf2..c788bef 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import akka.actor.ActorSystem; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -48,6 +49,7 @@ public class ExecutionGraphCheckpointCoordinatorTest { new JobID(), "test", new Configuration(), + new ExecutionConfig(), new FiniteDuration(1, TimeUnit.DAYS), new NoRestartStrategy(), Collections.<BlobKey>emptyList(), http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java index aeb521c..865760e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.client; import akka.actor.PoisonPill; import org.apache.curator.test.TestingServer; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -94,7 +95,7 @@ public class JobClientActorRecoveryITCase extends TestLogger { JobVertex blockingVertex = new JobVertex("Blocking Vertex"); blockingVertex.setInvokableClass(BlockingTask.class); blockingVertex.setParallelism(1); - final JobGraph jobGraph = new JobGraph("Blocking Test Job", blockingVertex); + final JobGraph jobGraph = new JobGraph("Blocking Test Job", new ExecutionConfig(), blockingVertex); final Promise<JobExecutionResult> promise = new scala.concurrent.impl.Promise.DefaultPromise<>(); Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java index 00ad632..ee1fd60 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java @@ -25,6 +25,7 @@ import akka.actor.Props; import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import akka.util.Timeout; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; @@ -47,7 +48,7 @@ import java.util.concurrent.TimeUnit; public class JobClientActorTest extends TestLogger { private static ActorSystem system; - private static JobGraph testJobGraph = new JobGraph("Test Job"); + private static JobGraph testJobGraph = new JobGraph("Test Job", new ExecutionConfig()); @BeforeClass public static void setup() { http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index 92b642a..e839c97 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -26,6 +26,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -54,10 +55,12 @@ public class TaskDeploymentDescriptorTest { final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0); final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0); final List<URL> requiredClasspaths = new ArrayList<URL>(0); - - final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId, taskName, - indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber, jobConfiguration, taskConfiguration, - invokableClass.getName(), producedResults, inputGates, requiredJars, requiredClasspaths, 47); + final ExecutionConfig executionConfig = new ExecutionConfig(); + + final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId, + executionConfig, taskName, indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber, + jobConfiguration, taskConfiguration, invokableClass.getName(), producedResults, inputGates, + requiredJars, requiredClasspaths, 47); final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); @@ -75,6 +78,7 @@ public class TaskDeploymentDescriptorTest { assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber()); assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions()); assertEquals(orig.getInputGates(), copy.getInputGates()); + assertEquals(orig.getExecutionConfig(), copy.getExecutionConfig()); assertEquals(orig.getRequiredJarFiles(), copy.getRequiredJarFiles()); assertEquals(orig.getRequiredClasspaths(), copy.getRequiredClasspaths()); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index ee372dd..d845d01 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -106,6 +107,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -150,6 +152,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -217,6 +220,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -471,6 +475,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -527,6 +532,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -588,6 +594,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -633,6 +640,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); @@ -696,14 +704,15 @@ public class ExecutionGraphConstructionTest { // isolated vertex JobVertex v8 = new JobVertex("vertex8"); v8.setParallelism(2); - - JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8); + + JobGraph jg = new JobGraph(jobId, jobName, new ExecutionConfig(), v1, v2, v3, v4, v5, v6, v7, v8); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 6362732..7a9cee7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -30,6 +30,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -84,6 +85,7 @@ public class ExecutionGraphDeploymentTest { jobId, "some job", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); @@ -287,6 +289,7 @@ public class ExecutionGraphDeploymentTest { jobId, "some job", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 646e195..0837927 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; @@ -72,13 +73,14 @@ public class ExecutionGraphRestartTest extends TestLogger { sender.setInvokableClass(Tasks.NoOpInvokable.class); sender.setParallelism(NUM_TASKS); - JobGraph jobGraph = new JobGraph("Pointwise job", sender); + JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -127,14 +129,15 @@ public class ExecutionGraphRestartTest extends TestLogger { groupVertex.setStrictlyCoLocatedWith(groupVertex2); //initiate and schedule job - JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, groupVertex2); + JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), groupVertex, groupVertex2); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), - AkkaUtils.getDefaultTimeout(), - new FixedDelayRestartStrategy(1, 0L)); + new ExecutionConfig(), + AkkaUtils.getDefaultTimeout(), + new FixedDelayRestartStrategy(1, 0L)); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); @@ -181,13 +184,14 @@ public class ExecutionGraphRestartTest extends TestLogger { sender.setInvokableClass(Tasks.NoOpInvokable.class); sender.setParallelism(NUM_TASKS); - JobGraph jobGraph = new JobGraph("Pointwise job", sender); + JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000)); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -216,6 +220,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "TestJob", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), // We want to manually control the restart and delay new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE)); @@ -224,7 +229,7 @@ public class ExecutionGraphRestartTest extends TestLogger { jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); jobVertex.setParallelism(NUM_TASKS); - JobGraph jobGraph = new JobGraph("TestJob", jobVertex); + JobGraph jobGraph = new JobGraph("TestJob", new ExecutionConfig(), jobVertex); executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -274,6 +279,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "TestJob", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), // We want to manually control the restart and delay new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE)); @@ -289,7 +295,7 @@ public class ExecutionGraphRestartTest extends TestLogger { jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); jobVertex.setParallelism(NUM_TASKS); - JobGraph jobGraph = new JobGraph("TestJob", jobVertex); + JobGraph jobGraph = new JobGraph("TestJob", new ExecutionConfig(), jobVertex); executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -349,13 +355,14 @@ public class ExecutionGraphRestartTest extends TestLogger { sender.setInvokableClass(Tasks.NoOpInvokable.class); sender.setParallelism(NUM_TASKS); - JobGraph jobGraph = new JobGraph("Pointwise job", sender); + JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender); ExecutionGraph eg = spy(new ExecutionGraph( TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000))); @@ -419,13 +426,14 @@ public class ExecutionGraphRestartTest extends TestLogger { receiver.setInvokableClass(Tasks.NoOpInvokable.class); receiver.setParallelism(1); - JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver); + JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000)); @@ -510,8 +518,8 @@ public class ExecutionGraphRestartTest extends TestLogger { vertex.setInvokableClass(Tasks.NoOpInvokable.class); vertex.setParallelism(1); - JobGraph jobGraph = new JobGraph("Test Job", vertex); - jobGraph.setRestartStrategyConfiguration(RestartStrategies.fixedDelayRestart( + JobGraph jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex); + jobGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart( Integer.MAX_VALUE, Integer.MAX_VALUE)); ExecutionGraph eg = new ExecutionGraph( @@ -519,6 +527,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "test job", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000000)); @@ -561,8 +570,8 @@ public class ExecutionGraphRestartTest extends TestLogger { vertex.setInvokableClass(Tasks.NoOpInvokable.class); vertex.setParallelism(1); - JobGraph jobGraph = new JobGraph("Test Job", vertex); - jobGraph.setRestartStrategyConfiguration(RestartStrategies.fixedDelayRestart( + JobGraph jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex); + jobGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart( Integer.MAX_VALUE, Integer.MAX_VALUE)); ExecutionGraph eg = new ExecutionGraph( @@ -570,6 +579,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "test job", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000000)); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java index 7cc91c0..d1bb680 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.StoppingException; @@ -127,6 +128,7 @@ public class ExecutionGraphSignalsTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); eg.attachJobGraph(ordered); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index ca07fbf..c85ca13 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy; import java.lang.reflect.Field; import java.net.InetAddress; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; @@ -174,6 +175,7 @@ public class ExecutionGraphTestUtils { new JobID(), "test job", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 5dd5ba6..1ff90e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import java.util.Arrays; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; @@ -53,6 +54,7 @@ public class ExecutionStateProgressTest { jid, "test job", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); graph.attachJobGraph(Arrays.asList(ajv)); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java index a2e2482..2d7d6f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import java.net.InetAddress; import java.util.concurrent.TimeUnit; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.StrictlyLocalAssignment; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.InputSplitSource; @@ -264,13 +265,14 @@ public class LocalInputSplitsTest { vertex.setInvokableClass(DummyInvokable.class); vertex.setInputSplitSource(new TestInputSplitSource(splits)); - JobGraph jobGraph = new JobGraph("test job", vertex); + JobGraph jobGraph = new JobGraph("test job", new ExecutionConfig(), vertex); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jobGraph.getJobID(), jobGraph.getName(), jobGraph.getJobConfiguration(), + new ExecutionConfig(), TIMEOUT, new NoRestartStrategy()); @@ -328,13 +330,14 @@ public class LocalInputSplitsTest { vertex.setInvokableClass(DummyInvokable.class); vertex.setInputSplitSource(new TestInputSplitSource(splits)); - JobGraph jobGraph = new JobGraph("test job", vertex); + JobGraph jobGraph = new JobGraph("test job", new ExecutionConfig(), vertex); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jobGraph.getJobID(), jobGraph.getName(), jobGraph.getJobConfiguration(), + new ExecutionConfig(), TIMEOUT, new NoRestartStrategy()); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index c1afe04..cbeeded 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -63,6 +64,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -104,6 +106,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -146,6 +149,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -189,6 +193,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -230,6 +235,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -291,6 +297,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -343,6 +350,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java index 2f0d5e7..f747ff3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.ExecutionState; @@ -185,6 +186,7 @@ public class TerminalStateDeadlockTest { jobId, "test graph", EMPTY_CONFIG, + new ExecutionConfig(), TIMEOUT, new FixedDelayRestartStrategy(1, 0)); } http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java index 3670358..25c9d70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.instance.DummyActorGateway; import org.apache.flink.runtime.instance.HardwareDescription; @@ -74,13 +75,14 @@ public class VertexLocationConstraintTest { JobVertex jobVertex = new JobVertex("test vertex", new JobVertexID()); jobVertex.setInvokableClass(DummyInvokable.class); jobVertex.setParallelism(2); - JobGraph jg = new JobGraph("test job", jobVertex); + JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jg.getJobID(), jg.getName(), jg.getJobConfiguration(), + new ExecutionConfig(), timeout, new NoRestartStrategy()); eg.attachJobGraph(Collections.singletonList(jobVertex)); @@ -146,13 +148,14 @@ public class VertexLocationConstraintTest { JobVertex jobVertex = new JobVertex("test vertex", new JobVertexID()); jobVertex.setInvokableClass(DummyInvokable.class); jobVertex.setParallelism(2); - JobGraph jg = new JobGraph("test job", jobVertex); + JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jg.getJobID(), jg.getName(), jg.getJobConfiguration(), + new ExecutionConfig(), timeout, new NoRestartStrategy()); eg.attachJobGraph(Collections.singletonList(jobVertex)); @@ -222,13 +225,14 @@ public class VertexLocationConstraintTest { jobVertex1.setSlotSharingGroup(sharingGroup); jobVertex2.setSlotSharingGroup(sharingGroup); - JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2); + JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex1, jobVertex2); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jg.getJobID(), jg.getName(), jg.getJobConfiguration(), + new ExecutionConfig(), timeout, new NoRestartStrategy()); eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2)); @@ -289,13 +293,14 @@ public class VertexLocationConstraintTest { JobVertex jobVertex = new JobVertex("test vertex", new JobVertexID()); jobVertex.setInvokableClass(DummyInvokable.class); jobVertex.setParallelism(1); - JobGraph jg = new JobGraph("test job", jobVertex); + JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jg.getJobID(), jg.getName(), jg.getJobConfiguration(), + new ExecutionConfig(), timeout, new NoRestartStrategy()); eg.attachJobGraph(Collections.singletonList(jobVertex)); @@ -354,7 +359,7 @@ public class VertexLocationConstraintTest { jobVertex1.setParallelism(1); jobVertex2.setParallelism(1); - JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2); + JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex1, jobVertex2); SlotSharingGroup sharingGroup = new SlotSharingGroup(); jobVertex1.setSlotSharingGroup(sharingGroup); @@ -365,6 +370,7 @@ public class VertexLocationConstraintTest { jg.getJobID(), jg.getName(), jg.getJobConfiguration(), + new ExecutionConfig(), timeout, new NoRestartStrategy()); eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2)); @@ -397,13 +403,14 @@ public class VertexLocationConstraintTest { public void testArchivingClearsFields() { try { JobVertex vertex = new JobVertex("test vertex", new JobVertexID()); - JobGraph jg = new JobGraph("test job", vertex); + JobGraph jg = new JobGraph("test job", new ExecutionConfig(), vertex); ExecutionGraph eg = new ExecutionGraph( TestingUtils.defaultExecutionContext(), jg.getJobID(), jg.getName(), jg.getJobConfiguration(), + new ExecutionConfig(), timeout, new NoRestartStrategy()); eg.attachJobGraph(Collections.singletonList(vertex)); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java index 5c7297e..5110249 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; @@ -75,6 +76,7 @@ public class VertexSlotSharingTest { new JobID(), "test job", new Configuration(), + new ExecutionConfig(), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); eg.attachJobGraph(vertices); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index 854be5f..317eed7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.api.reader.BufferReader; @@ -89,7 +90,7 @@ public class PartialConsumePipelinedResultTest { sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); final JobGraph jobGraph = new JobGraph( - "Partial Consume of Pipelined Result", sender, receiver); + "Partial Consume of Pipelined Result", new ExecutionConfig(), sender, receiver); final SlotSharingGroup slotSharingGroup = new SlotSharingGroup( sender.getID(), receiver.getID()); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java index ca047e8..68b05b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.*; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; @@ -31,7 +32,7 @@ public class JobGraphTest { @Test public void testSerialization() { try { - JobGraph jg = new JobGraph("The graph"); + JobGraph jg = new JobGraph("The graph", new ExecutionConfig()); // add some configuration values { @@ -89,8 +90,9 @@ public class JobGraphTest { target2.connectNewDataSetAsInput(intermediate2, DistributionPattern.POINTWISE); intermediate2.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE); intermediate1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE); - - JobGraph graph = new JobGraph("TestGraph", source1, source2, intermediate1, intermediate2, target1, target2); + + JobGraph graph = new JobGraph("TestGraph", new ExecutionConfig(), + source1, source2, intermediate1, intermediate2, target1, target2); List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources(); assertEquals(6, sorted.size()); @@ -133,8 +135,9 @@ public class JobGraphTest { l12.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE); l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE); - - JobGraph graph = new JobGraph("TestGraph", source1, source2, root, l11, l13, l12, l2); + + JobGraph graph = new JobGraph("TestGraph", new ExecutionConfig(), + source1, source2, root, l11, l13, l12, l2); List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources(); assertEquals(7, sorted.size()); @@ -179,8 +182,8 @@ public class JobGraphTest { op2.connectNewDataSetAsInput(op1, DistributionPattern.POINTWISE); op2.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE); op3.connectNewDataSetAsInput(op2, DistributionPattern.POINTWISE); - - JobGraph graph = new JobGraph("TestGraph", source, op1, op2, op3); + + JobGraph graph = new JobGraph("TestGraph", new ExecutionConfig(), source, op1, op2, op3); List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources(); assertEquals(4, sorted.size()); @@ -208,8 +211,8 @@ public class JobGraphTest { v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE); v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE); v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE); - - JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4); + + JobGraph jg = new JobGraph("Cyclic Graph", new ExecutionConfig(), v1, v2, v3, v4); try { jg.getVerticesSortedTopologicallyFromSources(); fail("Failed to raise error on topologically sorting cyclic graph."); @@ -240,8 +243,8 @@ public class JobGraphTest { v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE); v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE); target.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE); - - JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4, source, target); + + JobGraph jg = new JobGraph("Cyclic Graph", new ExecutionConfig(), v1, v2, v3, v4, source, target); try { jg.getVerticesSortedTopologicallyFromSources(); fail("Failed to raise error on topologically sorting cyclic graph."); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java index 46fb694..612f64f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph.jsonplan; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.TextNode; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -65,8 +66,8 @@ public class JsonGeneratorTest { sink1.connectNewDataSetAsInput(join2, DistributionPattern.POINTWISE); sink2.connectNewDataSetAsInput(join1, DistributionPattern.ALL_TO_ALL); - - JobGraph jg = new JobGraph("my job", source1, source2, source3, + + JobGraph jg = new JobGraph("my job", new ExecutionConfig(), source1, source2, source3, intermediate1, intermediate2, join1, join2, sink1, sink2); String plan = JsonPlanGenerator.generatePlan(jg); http://git-wip-us.apache.org/repos/asf/flink/blob/0f8d76c6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 27c32a0..d283e91 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -23,6 +23,7 @@ import akka.testkit.JavaTestKit; import com.typesafe.config.Config; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -122,7 +123,7 @@ public class JobManagerTest { sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block sender.createAndAddResultDataSet(rid, PIPELINED); - final JobGraph jobGraph = new JobGraph("Blocking test job", sender); + final JobGraph jobGraph = new JobGraph("Blocking test job", new ExecutionConfig(), sender); final JobID jid = jobGraph.getJobID(); final ActorGateway jobManagerGateway = cluster.getLeaderGateway( @@ -246,7 +247,7 @@ public class JobManagerTest { sender.setParallelism(2); sender.setInvokableClass(StoppableInvokable.class); - final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender); + final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new ExecutionConfig(), sender); final JobID jid = jobGraph.getJobID(); final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); @@ -294,7 +295,7 @@ public class JobManagerTest { sender.setParallelism(1); sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block - final JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", sender); + final JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", new ExecutionConfig(), sender); final JobID jid = jobGraph.getJobID(); final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());