This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executor-impl in repository https://gitbox.apache.org/repos/asf/flink.git
commit c08abac7556e7e63b092dd37d46c9e73df2697cf Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Sat Nov 16 22:24:26 2019 +0100 [FLINK-XXXXX] Update the DeploymentTarger setting --- .../flink/client/cli/AbstractCustomCommandLine.java | 3 ++- .../flink/client/deployment/StandaloneClientFactory.java | 3 ++- .../client/deployment/ClusterClientServiceLoaderTest.java | 3 ++- .../org/apache/flink/yarn/YarnClusterClientFactory.java | 6 +++++- .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java | 6 +++++- .../apache/flink/yarn/YarnClusterClientFactoryTest.java | 15 +++++++++++++-- 6 files changed, 29 insertions(+), 7 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java index f32d4f8..b8431cf 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java @@ -18,6 +18,7 @@ package org.apache.flink.client.cli; +import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -73,7 +74,7 @@ public abstract class AbstractCustomCommandLine implements CustomCommandLine { @Override public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException { final Configuration resultingConfiguration = new Configuration(configuration); - resultingConfiguration.setString(DeploymentOptions.TARGET, getId()); + resultingConfiguration.setString(DeploymentOptions.TARGET, StandaloneSessionClusterExecutor.NAME); if (commandLine.hasOption(addressOption.getOpt())) { String addressWithPort = commandLine.getOptionValue(addressOption.getOpt()); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java index b10204b..9597ff8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java @@ -18,6 +18,7 @@ package org.apache.flink.client.deployment; +import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; @@ -35,7 +36,7 @@ public class StandaloneClientFactory implements ClusterClientFactory<StandaloneC @Override public boolean isCompatibleWith(Configuration configuration) { checkNotNull(configuration); - return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET)); + return StandaloneSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET)); } @Override diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java index a084021..b7a9953 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java @@ -18,6 +18,7 @@ package org.apache.flink.client.deployment; +import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; @@ -57,7 +58,7 @@ public class ClusterClientServiceLoaderTest { @Test public void testStandaloneClusterClientFactoryDiscovery() { final Configuration config = new Configuration(); - config.setString(DeploymentOptions.TARGET, StandaloneClientFactory.ID); + config.setString(DeploymentOptions.TARGET, StandaloneSessionClusterExecutor.NAME); ClusterClientFactory<StandaloneClusterId> factory = serviceLoaderUnderTest.getClusterClientFactory(config); assertTrue(factory instanceof StandaloneClientFactory); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java index 7605470..ad391d8 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java @@ -25,6 +25,8 @@ import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.yarn.executors.YarnJobClusterExecutor; +import org.apache.flink.yarn.executors.YarnSessionClusterExecutor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -45,7 +47,9 @@ public class YarnClusterClientFactory implements ClusterClientFactory<Applicatio @Override public boolean isCompatibleWith(Configuration configuration) { checkNotNull(configuration); - return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET)); + final String deploymentTarget = configuration.getString(DeploymentOptions.TARGET); + return YarnJobClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget) || + YarnSessionClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget); } @Override diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 362deda..388dea0 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -47,6 +47,8 @@ import org.apache.flink.yarn.YarnClusterClientFactory; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal; +import org.apache.flink.yarn.executors.YarnJobClusterExecutor; +import org.apache.flink.yarn.executors.YarnSessionClusterExecutor; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; @@ -346,7 +348,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine { public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException { // we ignore the addressOption because it can only contain "yarn-cluster" final Configuration effectiveConfiguration = new Configuration(configuration); - effectiveConfiguration.setString(DeploymentOptions.TARGET, getId()); applyDescriptorOptionToConfig(commandLine, effectiveConfiguration); @@ -361,6 +362,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine { effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace); effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(applicationId)); + effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME); + } else { + effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME); } if (commandLine.hasOption(jmMemory.getOpt())) { diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java index 931313a..508c11e 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java @@ -23,6 +23,8 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.yarn.executors.YarnJobClusterExecutor; +import org.apache.flink.yarn.executors.YarnSessionClusterExecutor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.junit.Test; @@ -35,9 +37,18 @@ import static org.junit.Assert.assertTrue; public class YarnClusterClientFactoryTest { @Test - public void testYarnClusterClientFactoryDiscovery() { + public void testYarnClusterClientFactoryDiscoveryWithPerJobExecutor() { + testYarnClusterClientFactoryDiscoveryHelper(YarnJobClusterExecutor.NAME); + } + + @Test + public void testYarnClusterClientFactoryDiscoveryWithSessionExecutor() { + testYarnClusterClientFactoryDiscoveryHelper(YarnSessionClusterExecutor.NAME); + } + + private void testYarnClusterClientFactoryDiscoveryHelper(final String targetName) { final Configuration configuration = new Configuration(); - configuration.setString(DeploymentOptions.TARGET, YarnClusterClientFactory.ID); + configuration.setString(DeploymentOptions.TARGET, targetName); final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader(); final ClusterClientFactory<ApplicationId> factory = serviceLoader.getClusterClientFactory(configuration);