Repository: flink Updated Branches: refs/heads/master fe5de7e8e -> ff70cc3af
[FLINK-7107] [flip6] Add option to start a Flip-6 Yarn session cluster The Flip-6 Yarn session cluster can now be started with yarn-session.sh --flip6. Per default, the old Yarn application master will be started. This closes #4465. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a570aa5c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a570aa5c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a570aa5c Branch: refs/heads/master Commit: a570aa5c0c7d9a18e6cdcb689dcad9ff173dc2ac Parents: fe5de7e Author: Till Rohrmann <trohrm...@apache.org> Authored: Wed Aug 2 16:40:13 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed Aug 9 11:56:02 2017 +0200 ---------------------------------------------------------------------- ...CliFrontendYarnAddressConfigurationTest.java | 3 +- .../flink/yarn/FlinkYarnSessionCliTest.java | 5 +++- .../flink/yarn/cli/FlinkYarnSessionCli.java | 30 ++++++++++++++++---- 3 files changed, 30 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a570aa5c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index 0d57e20..1fed554 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -364,7 +364,8 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger { // override cluster descriptor to replace the YarnClient protected AbstractYarnClusterDescriptor getClusterDescriptor( Configuration configuration, - String configurationDirecotry) { + String configurationDirecotry, + boolean flip6) { return new TestingYarnClusterDescriptor(configuration, configurationDirecotry); } http://git-wip-us.apache.org/repos/asf/flink/blob/a570aa5c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index c7c25ff..8eef8f0 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -184,7 +184,10 @@ public class FlinkYarnSessionCliTest extends TestLogger { } @Override - protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory) { + protected AbstractYarnClusterDescriptor getClusterDescriptor( + Configuration configuration, + String configurationDirectory, + boolean flip6) { return new JarAgnosticClusterDescriptor(configuration, configurationDirectory); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a570aa5c/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 4a214e2..5d8abac 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -36,6 +36,7 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; import org.apache.flink.yarn.YarnClusterDescriptor; +import org.apache.flink.yarn.YarnClusterDescriptorV2; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -111,6 +112,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> private final Option slots; private final Option detached; private final Option zookeeperNamespace; + private final Option flip6; /** * @deprecated Streaming mode has been deprecated without replacement. Set the @@ -156,6 +158,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> streaming = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode"); name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN"); zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode"); + flip6 = new Option(shortPrefix + "f6", longPrefix + "flip6", false, "Specify this option to start a Flip-6 Yarn session cluster."); allOptions = new Options(); allOptions.addOption(flinkJar); @@ -172,6 +175,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> allOptions.addOption(name); allOptions.addOption(applicationId); allOptions.addOption(zookeeperNamespace); + allOptions.addOption(flip6); } /** @@ -266,7 +270,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor( configuration, - configurationDirectory); + configurationDirectory, + cmd.hasOption(flip6.getOpt())); // Jar Path Path localJarPath; @@ -552,7 +557,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> : config.getString(HighAvailabilityOptions.HA_CLUSTER_ID, applicationID); config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(config, configurationDirectory); + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( + config, + configurationDirectory, + cmdLine.hasOption(flip6.getOpt())); return yarnDescriptor.retrieve(applicationID); } else { throw new UnsupportedOperationException("Could not resume a Yarn cluster."); @@ -609,7 +617,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> // Query cluster for metrics if (cmd.hasOption(query.getOpt())) { - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(configuration, configurationDirectory); + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( + configuration, + configurationDirectory, + cmd.hasOption(flip6.getOpt())); String description; try { description = yarnDescriptor.getClusterDescription(); @@ -622,7 +633,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> return 0; } else if (cmd.hasOption(applicationId.getOpt())) { - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(configuration, configurationDirectory); + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( + configuration, + configurationDirectory, + cmd.hasOption(flip6.getOpt())); //configure ZK namespace depending on the value passed String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ? @@ -764,7 +778,11 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); } - protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory) { - return new YarnClusterDescriptor(configuration, configurationDirectory); + protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory, boolean flip6) { + if (flip6) { + return new YarnClusterDescriptorV2(configuration, configurationDirectory); + } else { + return new YarnClusterDescriptor(configuration, configurationDirectory); + } } }