This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors-clean in repository https://gitbox.apache.org/repos/asf/flink.git
commit fb182525e8962faa483b678d061046062f64924d Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Mon Nov 18 15:48:35 2019 +0100 [hotfix] CliFrontend.run() merges configurations into one --- .../org/apache/flink/client/cli/CliFrontend.java | 41 ++++++++++++++-------- .../flink/client/cli/ExecutionConfigAccessor.java | 5 +-- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 38243fc..d82b377 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -208,33 +208,44 @@ public class CliFrontend { throw new CliArgsException("Could not build the program from JAR file.", e); } - final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine); - final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine); - final List<URL> jobJars = program.getJobJarAndDependencies(); - final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars); - final Configuration executionConfig = executionParameters.getConfiguration(); + final Configuration effectiveConfiguration = + getEffectiveConfiguration(commandLine, programOptions, jobJars); try { - runProgram(executorConfig, executionConfig, program); + runProgram(effectiveConfiguration, program); } finally { program.deleteExtractedLibraries(); } } + private Configuration getEffectiveConfiguration( + final CommandLine commandLine, + final ProgramOptions programOptions, + final List<URL> jobJars) throws FlinkException { + + final CustomCommandLine customCommandLine = getActiveCustomCommandLine(checkNotNull(commandLine)); + final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions( + checkNotNull(programOptions), + checkNotNull(jobJars)); + + final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine); + final Configuration effectiveConfiguration = new Configuration(executorConfig); + return executionParameters.applyToConfiguration(effectiveConfiguration); + } + private <ClusterID> void runProgram( - Configuration executorConfig, - Configuration executionConfig, + Configuration configuration, PackagedProgram program) throws ProgramInvocationException, FlinkException { - final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executorConfig); + final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration); checkNotNull(clusterClientFactory); - final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executorConfig); + final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration); try { - final ClusterID clusterId = clusterClientFactory.getClusterId(executorConfig); - final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(executionConfig); + final ClusterID clusterId = clusterClientFactory.getClusterId(configuration); + final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration); final ClusterClient<ClusterID> client; // directly deploy the job if the cluster is started in job mode and detached @@ -243,7 +254,7 @@ public class CliFrontend { final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism); - final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig); + final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); client = clusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, @@ -264,7 +275,7 @@ public class CliFrontend { } else { // also in job mode we have to deploy a session cluster because the job // might consist of multiple parts (e.g. when using collect) - final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig); + final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); client = clusterDescriptor.deploySessionCluster(clusterSpecification); // if not running in detached mode, add a shutdown hook to shut down cluster if client exits // there's a race-condition here if cli is killed before shutdown hook is installed @@ -279,7 +290,7 @@ public class CliFrontend { int userParallelism = executionParameters.getParallelism(); LOG.debug("User parallelism is set to {}", userParallelism); - executeProgram(executionConfig, program, client); + executeProgram(configuration, program, client); } finally { if (clusterId == null && !executionParameters.getDetachedMode()) { // terminate the cluster only if we have started it before and if it's not detached diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java index 9e570e1..f55560b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java @@ -77,8 +77,9 @@ public class ExecutionConfigAccessor { return new ExecutionConfigAccessor(configuration); } - public Configuration getConfiguration() { - return configuration; + Configuration applyToConfiguration(final Configuration baseConfiguration) { + baseConfiguration.addAll(configuration); + return baseConfiguration; } public List<URL> getJars() {