This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c08abac7556e7e63b092dd37d46c9e73df2697cf
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Sat Nov 16 22:24:26 2019 +0100

    [FLINK-XXXXX] Update the DeploymentTarger setting
---
 .../flink/client/cli/AbstractCustomCommandLine.java       |  3 ++-
 .../flink/client/deployment/StandaloneClientFactory.java  |  3 ++-
 .../client/deployment/ClusterClientServiceLoaderTest.java |  3 ++-
 .../org/apache/flink/yarn/YarnClusterClientFactory.java   |  6 +++++-
 .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java    |  6 +++++-
 .../apache/flink/yarn/YarnClusterClientFactoryTest.java   | 15 +++++++++++++--
 6 files changed, 29 insertions(+), 7 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
index f32d4f8..b8431cf 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.cli;
 
+import 
org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -73,7 +74,7 @@ public abstract class AbstractCustomCommandLine implements 
CustomCommandLine {
        @Override
        public Configuration applyCommandLineOptionsToConfiguration(CommandLine 
commandLine) throws FlinkException {
                final Configuration resultingConfiguration = new 
Configuration(configuration);
-               resultingConfiguration.setString(DeploymentOptions.TARGET, 
getId());
+               resultingConfiguration.setString(DeploymentOptions.TARGET, 
StandaloneSessionClusterExecutor.NAME);
 
                if (commandLine.hasOption(addressOption.getOpt())) {
                        String addressWithPort = 
commandLine.getOptionValue(addressOption.getOpt());
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
index b10204b..9597ff8 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
+import 
org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 
@@ -35,7 +36,7 @@ public class StandaloneClientFactory implements 
ClusterClientFactory<StandaloneC
        @Override
        public boolean isCompatibleWith(Configuration configuration) {
                checkNotNull(configuration);
-               return 
ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+               return 
StandaloneSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
        }
 
        @Override
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
index a084021..b7a9953 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
+import 
org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 
@@ -57,7 +58,7 @@ public class ClusterClientServiceLoaderTest {
        @Test
        public void testStandaloneClusterClientFactoryDiscovery() {
                final Configuration config = new Configuration();
-               config.setString(DeploymentOptions.TARGET, 
StandaloneClientFactory.ID);
+               config.setString(DeploymentOptions.TARGET, 
StandaloneSessionClusterExecutor.NAME);
 
                ClusterClientFactory<StandaloneClusterId> factory = 
serviceLoaderUnderTest.getClusterClientFactory(config);
                assertTrue(factory instanceof StandaloneClientFactory);
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index 7605470..ad391d8 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -45,7 +47,9 @@ public class YarnClusterClientFactory implements 
ClusterClientFactory<Applicatio
        @Override
        public boolean isCompatibleWith(Configuration configuration) {
                checkNotNull(configuration);
-               return 
ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+               final String deploymentTarget = 
configuration.getString(DeploymentOptions.TARGET);
+               return 
YarnJobClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget) ||
+                               
YarnSessionClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget);
        }
 
        @Override
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 362deda..388dea0 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -47,6 +47,8 @@ import org.apache.flink.yarn.YarnClusterClientFactory;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -346,7 +348,6 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine {
        public Configuration applyCommandLineOptionsToConfiguration(CommandLine 
commandLine) throws FlinkException {
                // we ignore the addressOption because it can only contain 
"yarn-cluster"
                final Configuration effectiveConfiguration = new 
Configuration(configuration);
-               effectiveConfiguration.setString(DeploymentOptions.TARGET, 
getId());
 
                applyDescriptorOptionToConfig(commandLine, 
effectiveConfiguration);
 
@@ -361,6 +362,9 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine {
 
                        effectiveConfiguration.setString(HA_CLUSTER_ID, 
zooKeeperNamespace);
                        
effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, 
ConverterUtils.toString(applicationId));
+                       
effectiveConfiguration.setString(DeploymentOptions.TARGET, 
YarnSessionClusterExecutor.NAME);
+               } else {
+                       
effectiveConfiguration.setString(DeploymentOptions.TARGET, 
YarnJobClusterExecutor.NAME);
                }
 
                if (commandLine.hasOption(jmMemory.getOpt())) {
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
index 931313a..508c11e 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.junit.Test;
@@ -35,9 +37,18 @@ import static org.junit.Assert.assertTrue;
 public class YarnClusterClientFactoryTest {
 
        @Test
-       public void testYarnClusterClientFactoryDiscovery() {
+       public void testYarnClusterClientFactoryDiscoveryWithPerJobExecutor() {
+               
testYarnClusterClientFactoryDiscoveryHelper(YarnJobClusterExecutor.NAME);
+       }
+
+       @Test
+       public void testYarnClusterClientFactoryDiscoveryWithSessionExecutor() {
+               
testYarnClusterClientFactoryDiscoveryHelper(YarnSessionClusterExecutor.NAME);
+       }
+
+       private void testYarnClusterClientFactoryDiscoveryHelper(final String 
targetName) {
                final Configuration configuration = new Configuration();
-               configuration.setString(DeploymentOptions.TARGET, 
YarnClusterClientFactory.ID);
+               configuration.setString(DeploymentOptions.TARGET, targetName);
 
                final ClusterClientServiceLoader serviceLoader = new 
DefaultClusterClientServiceLoader();
                final ClusterClientFactory<ApplicationId> factory = 
serviceLoader.getClusterClientFactory(configuration);

Reply via email to