Repository: flink
Updated Branches:
  refs/heads/master fe5de7e8e -> ff70cc3af


[FLINK-7107] [flip6] Add option to start a Flip-6 Yarn session cluster

The Flip-6 Yarn session cluster can now be started with yarn-session.sh 
--flip6. Per
default, the old Yarn application master will be started.

This closes #4465.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a570aa5c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a570aa5c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a570aa5c

Branch: refs/heads/master
Commit: a570aa5c0c7d9a18e6cdcb689dcad9ff173dc2ac
Parents: fe5de7e
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Aug 2 16:40:13 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Aug 9 11:56:02 2017 +0200

----------------------------------------------------------------------
 ...CliFrontendYarnAddressConfigurationTest.java |  3 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  5 +++-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 30 ++++++++++++++++----
 3 files changed, 30 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a570aa5c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 0d57e20..1fed554 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -364,7 +364,8 @@ public class CliFrontendYarnAddressConfigurationTest 
extends TestLogger {
                        // override cluster descriptor to replace the YarnClient
                        protected AbstractYarnClusterDescriptor 
getClusterDescriptor(
                                        Configuration configuration,
-                                       String configurationDirecotry) {
+                                       String configurationDirecotry,
+                                       boolean flip6) {
                                return new 
TestingYarnClusterDescriptor(configuration, configurationDirecotry);
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a570aa5c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index c7c25ff..8eef8f0 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -184,7 +184,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                }
 
                @Override
-               protected AbstractYarnClusterDescriptor 
getClusterDescriptor(Configuration configuration, String 
configurationDirectory) {
+               protected AbstractYarnClusterDescriptor getClusterDescriptor(
+                       Configuration configuration,
+                       String configurationDirectory,
+                       boolean flip6) {
                        return new JarAgnosticClusterDescriptor(configuration, 
configurationDirectory);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a570aa5c/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 4a214e2..5d8abac 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -36,6 +36,7 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.YarnClusterDescriptorV2;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -111,6 +112,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
        private final Option slots;
        private final Option detached;
        private final Option zookeeperNamespace;
+       private final Option flip6;
 
        /**
         * @deprecated Streaming mode has been deprecated without replacement. 
Set the
@@ -156,6 +158,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                streaming = new Option(shortPrefix + "st", longPrefix + 
"streaming", false, "Start Flink in streaming mode");
                name = new Option(shortPrefix + "nm", longPrefix + "name", 
true, "Set a custom name for the application on YARN");
                zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + 
"zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for 
high availability mode");
+               flip6 = new Option(shortPrefix + "f6", longPrefix + "flip6", 
false, "Specify this option to start a Flip-6 Yarn session cluster.");
 
                allOptions = new Options();
                allOptions.addOption(flinkJar);
@@ -172,6 +175,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                allOptions.addOption(name);
                allOptions.addOption(applicationId);
                allOptions.addOption(zookeeperNamespace);
+               allOptions.addOption(flip6);
        }
 
        /**
@@ -266,7 +270,8 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
 
                AbstractYarnClusterDescriptor yarnClusterDescriptor = 
getClusterDescriptor(
                        configuration,
-                       configurationDirectory);
+                       configurationDirectory,
+                       cmd.hasOption(flip6.getOpt()));
 
                // Jar Path
                Path localJarPath;
@@ -552,7 +557,10 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                                        : 
config.getString(HighAvailabilityOptions.HA_CLUSTER_ID, applicationID);
                        config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
 
-                       AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(config, configurationDirectory);
+                       AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
+                               config,
+                               configurationDirectory,
+                               cmdLine.hasOption(flip6.getOpt()));
                        return yarnDescriptor.retrieve(applicationID);
                } else {
                        throw new UnsupportedOperationException("Could not 
resume a Yarn cluster.");
@@ -609,7 +617,10 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
 
                // Query cluster for metrics
                if (cmd.hasOption(query.getOpt())) {
-                       AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(configuration, configurationDirectory);
+                       AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
+                               configuration,
+                               configurationDirectory,
+                               cmd.hasOption(flip6.getOpt()));
                        String description;
                        try {
                                description = 
yarnDescriptor.getClusterDescription();
@@ -622,7 +633,10 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        return 0;
                } else if (cmd.hasOption(applicationId.getOpt())) {
 
-                       AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(configuration, configurationDirectory);
+                       AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
+                               configuration,
+                               configurationDirectory,
+                               cmd.hasOption(flip6.getOpt()));
 
                        //configure ZK namespace depending on the value passed
                        String zkNamespace = 
cmd.hasOption(zookeeperNamespace.getOpt()) ?
@@ -764,7 +778,11 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + 
currentUser);
        }
 
-       protected AbstractYarnClusterDescriptor 
getClusterDescriptor(Configuration configuration, String 
configurationDirectory) {
-               return new YarnClusterDescriptor(configuration, 
configurationDirectory);
+       protected AbstractYarnClusterDescriptor 
getClusterDescriptor(Configuration configuration, String 
configurationDirectory, boolean flip6) {
+               if (flip6) {
+                       return new YarnClusterDescriptorV2(configuration, 
configurationDirectory);
+               } else {
+                       return new YarnClusterDescriptor(configuration, 
configurationDirectory);
+               }
        }
 }

Reply via email to