http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java new file mode 100644 index 0000000..b12952a --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileWriter; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.PrintStream; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * This base class allows to use the MiniYARNCluster. + * The cluster is re-used for all tests. + * + * This class is located in a different package which is build after flink-dist. This way, + * we can use the YARN uberjar of flink to start a Flink YARN session. + */ +public abstract class YarnTestBase { + private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOIT.class); + + private final static PrintStream originalStdout = System.out; + private final static PrintStream originalStderr = System.err; + + + // Temp directory which is deleted after the unit test. + private static TemporaryFolder tmp = new TemporaryFolder(); + + protected static MiniYARNCluster yarnCluster = null; + + protected static File flinkUberjar; + + protected static final Configuration yarnConfiguration; + static { + yarnConfiguration = new YarnConfiguration(); + yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways + yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); + yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2); + yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); + yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster. + // so we have to change the number of cores for testing. + } + + // This code is taken from: http://stackoverflow.com/a/7201825/568695 + // it changes the environment variables of this JVM. Use only for testing purposes! + private static void setEnv(Map<String, String> newenv) { + try { + Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); + Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment"); + theEnvironmentField.setAccessible(true); + Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null); + env.putAll(newenv); + Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); + theCaseInsensitiveEnvironmentField.setAccessible(true); + Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null); + cienv.putAll(newenv); + } catch (NoSuchFieldException e) { + try { + Class[] classes = Collections.class.getDeclaredClasses(); + Map<String, String> env = System.getenv(); + for (Class cl : classes) { + if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Object obj = field.get(env); + Map<String, String> map = (Map<String, String>) obj; + map.clear(); + map.putAll(newenv); + } + } + } catch (Exception e2) { + throw new RuntimeException(e2); + } + } catch (Exception e1) { + throw new RuntimeException(e1); + } + } + + /** + * Sleep a bit between the tests (we are re-using the YARN cluster for the tests) + */ + @After + public void sleep() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Assert.fail("Should not happen"); + } + } + + @Before + public void checkClusterEmpty() throws IOException, YarnException { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(yarnConfiguration); + yarnClient.start(); + List<ApplicationReport> apps = yarnClient.getApplications(); + for(ApplicationReport app : apps) { + if(app.getYarnApplicationState() != YarnApplicationState.FINISHED) { + Assert.fail("There is at least one application on the cluster is not finished"); + } + } + } + + /** + * Locate a file or diretory directory + */ + public static File findFile(String startAt, FilenameFilter fnf) { + File root = new File(startAt); + String[] files = root.list(); + if(files == null) { + return null; + } + for(String file : files) { + + File f = new File(startAt + File.separator + file); + if(f.isDirectory()) { + File r = findFile(f.getAbsolutePath(), fnf); + if(r != null) { + return r; + } + } else if (fnf.accept(f.getParentFile(), f.getName())) { + return f; + } + + } + return null; + } + + /** + * Filter to find root dir of the flink-yarn dist. + */ + public static class RootDirFilenameFilter implements FilenameFilter { + @Override + public boolean accept(File dir, String name) { + return name.endsWith("yarn-uberjar.jar") && dir.toString().contains("/lib"); + } + } + public static class ContainsName implements FilenameFilter { + private String name; + private String excludeInPath = null; + + public ContainsName(String name) { + this.name = name; + } + + public ContainsName(String name, String excludeInPath) { + this.name = name; + this.excludeInPath = excludeInPath; + } + + @Override + public boolean accept(File dir, String name) { + if(excludeInPath == null) { + return name.contains(this.name); + } else { + return name.contains(this.name) && !dir.toString().contains(excludeInPath); + } + } + } + + public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException { + tmp.create(); + File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml"); + + FileWriter writer = new FileWriter(yarnSiteXML); + yarnConf.writeXml(writer); + writer.flush(); + writer.close(); + return yarnSiteXML; + } + + public static void startYARNWithConfig(Configuration conf) { + flinkUberjar = findFile(".", new RootDirFilenameFilter()); + Assert.assertNotNull(flinkUberjar); + String flinkDistRootDir = flinkUberjar.getParentFile().getParent(); + + if (!flinkUberjar.exists()) { + Assert.fail("Unable to locate yarn-uberjar.jar"); + } + + try { + LOG.info("Starting up MiniYARN cluster"); + if (yarnCluster == null) { + yarnCluster = new MiniYARNCluster(YARNSessionFIFOIT.class.getName(), 2, 1, 1); + + yarnCluster.init(conf); + yarnCluster.start(); + } + + Map<String, String> map = new HashMap<String, String>(System.getenv()); + File flinkConfFilePath = findFile(flinkDistRootDir, new ContainsName("flink-conf.yaml")); + Assert.assertNotNull(flinkConfFilePath); + map.put("FLINK_CONF_DIR", flinkConfFilePath.getParent()); + File yarnConfFile = writeYarnSiteConfigXML(conf); + map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath()); + setEnv(map); + + Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED); + } catch (Exception ex) { + ex.printStackTrace(); + LOG.error("setup failure", ex); + Assert.fail(); + } + } + + /** + * Default @BeforeClass impl. Overwrite this for passing a different configuration + */ + @BeforeClass + public static void setup() { + startYARNWithConfig(yarnConfiguration); + } + + // -------------------------- Runner -------------------------- // + + private static ByteArrayOutputStream outContent; + private static ByteArrayOutputStream errContent; + enum RunTypes { + YARN_SESSION, CLI_FRONTEND + } + + protected void runWithArgs(String[] args, String expect, RunTypes type) { + LOG.info("Running with args "+ Arrays.toString(args)); + + outContent = new ByteArrayOutputStream(); + errContent = new ByteArrayOutputStream(); + System.setOut(new PrintStream(outContent)); + System.setErr(new PrintStream(errContent)); + + + final int START_TIMEOUT_SECONDS = 60; + + Runner runner = new Runner(args, type); + runner.start(); + + boolean expectedStringSeen = false; + for(int second = 0; second < START_TIMEOUT_SECONDS; second++) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Assert.fail("Interruption not expected"); + } + // check output for correct TaskManager startup. + if(outContent.toString().contains(expect) + || errContent.toString().contains(expect) ) { + expectedStringSeen = true; + LOG.info("Found expected output in redirected streams"); + // send "stop" command to command line interface + runner.sendStop(); + // wait for the thread to stop + try { + runner.join(1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted while stopping runner", e); + } + LOG.warn("stopped"); + break; + } + // check if thread died + if(!runner.isAlive()) { + sendOutput(); + Assert.fail("Runner thread died before the test was finished. Return value = "+runner.getReturnValue()); + } + } + + sendOutput(); + Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " + + "expected string did not show up", expectedStringSeen); + LOG.info("Test was successful"); + } + + private static void sendOutput() { + System.setOut(originalStdout); + System.setErr(originalStderr); + + LOG.info("Sending stdout content through logger: \n\n"+outContent.toString()+"\n\n"); + LOG.info("Sending stderr content through logger: \n\n"+errContent.toString()+"\n\n"); + } + + public static class Runner extends Thread { + private final String[] args; + private int returnValue; + private RunTypes type; + private FlinkYarnSessionCli yCli; + + public Runner(String[] args, RunTypes type) { + this.args = args; + this.type = type; + } + + public int getReturnValue() { + return returnValue; + } + + @Override + public void run() { + switch(type) { + case YARN_SESSION: + yCli = new FlinkYarnSessionCli("", ""); + returnValue = yCli.run(args); + break; + case CLI_FRONTEND: + CliFrontend cli = new CliFrontend(); + returnValue = cli.parseParameters(args); + break; + default: + throw new RuntimeException("Unknown type " + type); + } + + if(returnValue != 0) { + Assert.fail("The YARN session returned with non-null value="+returnValue); + } + } + + public void sendStop() { + if(yCli != null) { + yCli.stop(); + } + } + } + + // -------------------------- Tear down -------------------------- // + + @AfterClass + public static void tearDown() { + //shutdown YARN cluster + if (yarnCluster != null) { + LOG.info("shutdown MiniYarn cluster"); + yarnCluster.stop(); + yarnCluster = null; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..26d6a12 --- /dev/null +++ b/flink-yarn-tests/src/test/resources/log4j-test.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, file + +# Log all infos in the given file +log4j.appender.file=org.apache.log4j.ConsoleAppender +log4j.appender.file.append=false +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml new file mode 100644 index 0000000..1569f15 --- /dev/null +++ b/flink-yarn/pom.xml @@ -0,0 +1,228 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>0.9-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-yarn</artifactId> + <name>flink-yarn</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <artifactId>hadoop-core</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-actor_2.10</artifactId> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-remote_2.10</artifactId> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-camel_2.10</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-stream</artifactId> + <version>2.14.0</version> + </dependency> + + <!-- guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run scala compiler in the process-test-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <version>0.5.0</version> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <verbose>false</verbose> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <failOnWarning>false</failOnWarning> + <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> + <configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation> + <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> + <outputEncoding>UTF-8</outputEncoding> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java new file mode 100644 index 0000000..c922963 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java @@ -0,0 +1,653 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Records; + +/** + * All classes in this package contain code taken from + * https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc + * and + * https://github.com/hortonworks/simple-yarn-app + * and + * https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java + * + * The Flink jar is uploaded to HDFS by this client. + * The application master and all the TaskManager containers get the jar file downloaded + * by YARN into their local fs. + * + */ +public class FlinkYarnClient extends AbstractFlinkYarnClient { + private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnClient.class); + + /** + * Constants, + * all starting with ENV_ are used as environment variables to pass values from the Client + * to the Application Master. + */ + public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY"; + public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT"; + public final static String ENV_APP_ID = "_APP_ID"; + public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS). + public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR"; + public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES"; + public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME"; + public static final String ENV_SLOTS = "_SLOTS"; + public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES"; + + private static final String DEFAULT_QUEUE_NAME = "default"; + + + /** + * Minimum memory requirements, checked by the Client. + */ + private static final int MIN_JM_MEMORY = 128; + private static final int MIN_TM_MEMORY = 128; + + private Configuration conf; + private YarnClient yarnClient; + private YarnClientApplication yarnApplication; + + + /** + * Files (usually in a distributed file system) used for the YARN session of Flink. + * Contains configuration files and jar files. + */ + private Path sessionFilesDir; + + /** + * If the user has specified a different number of slots, we store them here + */ + private int slots = -1; + + private int jobManagerMemoryMb = 512; + + private int taskManagerMemoryMb = 512; + + private int taskManagerCount = 1; + + private String yarnQueue = DEFAULT_QUEUE_NAME; + + private String configurationDirectory; + + private Path flinkConfigurationPath; + + private Path flinkLoggingConfigurationPath; // optional + + private Path flinkJarPath; + + private String dynamicPropertiesEncoded; + + private List<File> shipFiles = new ArrayList<File>(); + + + public FlinkYarnClient() { + // Check if security is enabled + if(UserGroupInformation.isSecurityEnabled()) { + throw new RuntimeException("Flink YARN client does not have security support right now." + + "File a bug, we will fix it asap"); + } + conf = Utils.initializeYarnConfiguration(); + if(this.yarnClient == null) { + // Create yarnClient + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + } + } + + @Override + public void setJobManagerMemory(int memoryMb) { + if(memoryMb < MIN_JM_MEMORY) { + throw new IllegalArgumentException("The JobManager memory is below the minimum required memory amount " + + "of "+MIN_JM_MEMORY+" MB"); + } + this.jobManagerMemoryMb = memoryMb; + } + + @Override + public void setTaskManagerMemory(int memoryMb) { + if(memoryMb < MIN_TM_MEMORY) { + throw new IllegalArgumentException("The TaskManager memory is below the minimum required memory amount " + + "of "+MIN_TM_MEMORY+" MB"); + } + this.taskManagerMemoryMb = memoryMb; + } + + @Override + public void setTaskManagerSlots(int slots) { + if(slots <= 0) { + throw new IllegalArgumentException("Number of TaskManager slots must be positive"); + } + this.slots = slots; + } + + @Override + public int getTaskManagerSlots() { + return this.slots; + } + + @Override + public void setQueue(String queue) { + this.yarnQueue = queue; + } + + @Override + public void setLocalJarPath(Path localJarPath) { + if(!localJarPath.toString().endsWith("jar")) { + throw new IllegalArgumentException("The passed jar path ('"+localJarPath+"') does not end with the 'jar' extension"); + } + this.flinkJarPath = localJarPath; + } + + @Override + public void setConfigurationFilePath(Path confPath) { + flinkConfigurationPath = confPath; + } + + public void setConfigurationDirectory(String configurationDirectory) { + this.configurationDirectory = configurationDirectory; + } + + @Override + public void setFlinkLoggingConfigurationPath(Path logConfPath) { + flinkLoggingConfigurationPath = logConfPath; + } + + @Override + public Path getFlinkLoggingConfigurationPath() { + return flinkLoggingConfigurationPath; + } + + @Override + public void setTaskManagerCount(int tmCount) { + if(tmCount < 1) { + throw new IllegalArgumentException("The TaskManager count has to be at least 1."); + } + this.taskManagerCount = tmCount; + } + + @Override + public int getTaskManagerCount() { + return this.taskManagerCount; + } + + @Override + public void setShipFiles(List<File> shipFiles) { + this.shipFiles.addAll(shipFiles); + } + + public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) { + this.dynamicPropertiesEncoded = dynamicPropertiesEncoded; + } + + @Override + public String getDynamicPropertiesEncoded() { + return this.dynamicPropertiesEncoded; + } + + + public void isReadyForDepoyment() throws YarnDeploymentException { + if(taskManagerCount <= 0) { + throw new YarnDeploymentException("Taskmanager count must be positive"); + } + if(this.flinkJarPath == null) { + throw new YarnDeploymentException("The Flink jar path is null"); + } + if(this.configurationDirectory == null) { + throw new YarnDeploymentException("Configuration directory not set"); + } + if(this.flinkConfigurationPath == null) { + throw new YarnDeploymentException("Configuration path not set"); + } + + } + + public static boolean allocateResource(int[] nodeManagers, int toAllocate) { + for(int i = 0; i < nodeManagers.length; i++) { + if(nodeManagers[i] >= toAllocate) { + nodeManagers[i] -= toAllocate; + return true; + } + } + return false; + } + + /** + * This method will block until the ApplicationMaster/JobManager have been + * deployed on YARN. + */ + @Override + public AbstractFlinkYarnCluster deploy(String clusterName) throws Exception { + isReadyForDepoyment(); + + LOG.info("Using values:"); + LOG.info("\tTaskManager count = " + taskManagerCount); + LOG.info("\tJobManager memory = " + jobManagerMemoryMb); + LOG.info("\tTaskManager memory = " + taskManagerMemoryMb); + + // Create application via yarnClient + yarnApplication = yarnClient.createApplication(); + GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse(); + + // ------------------ Check if the specified queue exists -------------- + + List<QueueInfo> queues = yarnClient.getAllQueues(); + if(queues.size() > 0) { // check only if there are queues configured. + boolean queueFound = false; + for (QueueInfo queue : queues) { + if (queue.getQueueName().equals(this.yarnQueue)) { + queueFound = true; + break; + } + } + if (!queueFound) { + String queueNames = ""; + for (QueueInfo queue : queues) { + queueNames += queue.getQueueName() + ", "; + } + throw new YarnDeploymentException("The specified queue '" + this.yarnQueue + "' does not exist. " + + "Available queues: " + queueNames); + } + } else { + LOG.debug("The YARN cluster does not have any queues configured"); + } + + // ------------------ Check if the YARN Cluster has the requested resources -------------- + + // the yarnMinAllocationMB specifies the smallest possible container allocation size. + // all allocations below this value are automatically set to this value. + final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0); + if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) { + LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. " + + "The value of 'yarn.scheduler.minimum-allocation-mb' is '"+yarnMinAllocationMB+"'. Please increase the memory size." + + "YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " + + "you requested will start."); + } + + // set the memory to minAllocationMB to do the next checks correctly + if(jobManagerMemoryMb < yarnMinAllocationMB) { + jobManagerMemoryMb = yarnMinAllocationMB; + } + if(taskManagerMemoryMb < yarnMinAllocationMB) { + taskManagerMemoryMb = yarnMinAllocationMB; + } + + Resource maxRes = appResponse.getMaximumResourceCapability(); + final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n"; + if(jobManagerMemoryMb > maxRes.getMemory() ) { + failSessionDuringDeployment(); + throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n" + + "Maximum Memory: "+maxRes.getMemory() + "MB Requested: "+jobManagerMemoryMb+"MB. " + NOTE); + } + + if(taskManagerMemoryMb > maxRes.getMemory() ) { + failSessionDuringDeployment(); + throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n" + + "Maximum Memory: " + maxRes.getMemory() + " Requested: "+taskManagerMemoryMb + "MB. " + NOTE); + } + + + int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount; + ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient); + if(freeClusterMem.totalFreeMemory < totalMemoryRequired) { + failSessionDuringDeployment(); + throw new YarnDeploymentException("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. " + + "There are currently only " + freeClusterMem.totalFreeMemory+"MB available."); + + } + if( taskManagerMemoryMb > freeClusterMem.containerLimit) { + failSessionDuringDeployment(); + throw new YarnDeploymentException("The requested amount of memory for the TaskManagers ("+taskManagerMemoryMb+"MB) is more than " + + "the largest possible YARN container: "+freeClusterMem.containerLimit); + } + if( jobManagerMemoryMb > freeClusterMem.containerLimit) { + failSessionDuringDeployment(); + throw new YarnDeploymentException("The requested amount of memory for the JobManager ("+jobManagerMemoryMb+"MB) is more than " + + "the largest possible YARN container: "+freeClusterMem.containerLimit); + } + + // ----------------- check if the requested containers fit into the cluster. + + int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length); + // first, allocate the jobManager somewhere. + if(!allocateResource(nmFree, jobManagerMemoryMb)) { + failSessionDuringDeployment(); + throw new YarnDeploymentException("Unable to find a NodeManager that can fit the JobManager/Application master. " + + "The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: "+Arrays.toString(freeClusterMem.nodeManagersFree)); + } + // allocate TaskManagers + for(int i = 0; i < taskManagerCount; i++) { + if(!allocateResource(nmFree, taskManagerMemoryMb)) { + failSessionDuringDeployment(); + throw new YarnDeploymentException("There is not enough memory available in the YARN cluster. " + + "The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " + + "NodeManagers available: "+Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" + + "After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " + + "the following NodeManagers are available: " + Arrays.toString(nmFree) ); + } + } + + // ------------------ Prepare Application Master Container ------------------------------ + + // respect custom JVM options in the YAML file + final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, ""); + + String logbackFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME; + boolean hasLogback = new File(logbackFile).exists(); + String log4jFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME; + + boolean hasLog4j = new File(log4jFile).exists(); + if(hasLogback) { + shipFiles.add(new File(logbackFile)); + } + if(hasLog4j) { + shipFiles.add(new File(log4jFile)); + } + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records + .newRecord(ContainerLaunchContext.class); + + String amCommand = "$JAVA_HOME/bin/java" + + " -Xmx"+Utils.calculateHeapSize(jobManagerMemoryMb)+"M " +javaOpts; + + if(hasLogback || hasLog4j) { + amCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-main.log\""; + } + + if(hasLogback) { + amCommand += " -Dlogback.configurationFile=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME; + } + if(hasLog4j) { + amCommand += " -Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME; + } + + amCommand += " "+ApplicationMaster.class.getName()+" " + + " 1>" + + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log" + + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log"; + amContainer.setCommands(Collections.singletonList(amCommand)); + + LOG.debug("Application Master start command: "+amCommand); + + // intialize HDFS + // Copy the application master jar to the filesystem + // Create a local resource to point to the destination jar path + final FileSystem fs = FileSystem.get(conf); + + // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method. + if( !fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && + fs.getScheme().startsWith("file")) { + LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the " + + "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values." + + "The Flink YARN client needs to store its files in a distributed file system"); + } + + // Set-up ApplicationSubmissionContext for the application + ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); + final ApplicationId appId = appContext.getApplicationId(); + + // Setup jar for ApplicationMaster + LocalResource appMasterJar = Records.newRecord(LocalResource.class); + LocalResource flinkConf = Records.newRecord(LocalResource.class); + Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory()); + Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory()); + Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2); + localResources.put("flink.jar", appMasterJar); + localResources.put("flink-conf.yaml", flinkConf); + + + // setup security tokens (code from apache storm) + final Path[] paths = new Path[3 + shipFiles.size()]; + StringBuffer envShipFileList = new StringBuffer(); + // upload ship files + for (int i = 0; i < shipFiles.size(); i++) { + File shipFile = shipFiles.get(i); + LocalResource shipResources = Records.newRecord(LocalResource.class); + Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); + paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(), + shipLocalPath, shipResources, fs.getHomeDirectory()); + localResources.put(shipFile.getName(), shipResources); + + envShipFileList.append(paths[3 + i]); + if(i+1 < shipFiles.size()) { + envShipFileList.append(','); + } + } + + paths[0] = remotePathJar; + paths[1] = remotePathConf; + sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/"); + FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL); + fs.setPermission(sessionFilesDir, permission); // set permission for path. + Utils.setTokensFor(amContainer, paths, this.conf); + + + amContainer.setLocalResources(localResources); + fs.close(); + + // Setup CLASSPATH for ApplicationMaster + Map<String, String> appMasterEnv = new HashMap<String, String>(); + Utils.setupEnv(conf, appMasterEnv); + // set configuration values + appMasterEnv.put(FlinkYarnClient.ENV_TM_COUNT, String.valueOf(taskManagerCount)); + appMasterEnv.put(FlinkYarnClient.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb)); + appMasterEnv.put(FlinkYarnClient.FLINK_JAR_PATH, remotePathJar.toString() ); + appMasterEnv.put(FlinkYarnClient.ENV_APP_ID, appId.toString()); + appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString()); + appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() ); + appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName()); + appMasterEnv.put(FlinkYarnClient.ENV_SLOTS, String.valueOf(slots)); + if(dynamicPropertiesEncoded != null) { + appMasterEnv.put(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded); + } + + amContainer.setEnvironment(appMasterEnv); + + // Set up resource type requirements for ApplicationMaster + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(jobManagerMemoryMb); + capability.setVirtualCores(1); + + if(clusterName == null) { + clusterName = "Flink session with "+taskManagerCount+" TaskManagers"; + } + + appContext.setApplicationName(clusterName); // application name + appContext.setApplicationType("Apache Flink"); + appContext.setAMContainerSpec(amContainer); + appContext.setResource(capability); + appContext.setQueue(yarnQueue); + + + LOG.info("Submitting application master " + appId); + yarnClient.submitApplication(appContext); + + LOG.info("Waiting for the cluster to be allocated"); + int waittime = 0; + loop: while( true ) { + ApplicationReport report = yarnClient.getApplicationReport(appId); + YarnApplicationState appState = report.getYarnApplicationState(); + switch(appState) { + case FAILED: + case FINISHED: + case KILLED: + throw new YarnDeploymentException("The YARN application unexpectedly switched to state " + + appState +" during deployment. \n" + + "Diagnostics from YARN: "+report.getDiagnostics() + "\n" + + "If log aggregation is enabled on your cluster, use this command to further invesitage the issue:\n" + + "yarn logs -applicationId "+appId); + //break .. + case RUNNING: + LOG.info("YARN application has been deployed successfully."); + break loop; + default: + LOG.info("Deploying cluster, current state "+appState); + if(waittime > 60000) { + LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster"); + } + + } + waittime += 1000; + Thread.sleep(1000); + } + // the Flink cluster is deployed in YARN. Represent cluster + return new FlinkYarnCluster(yarnClient, appId, conf, sessionFilesDir); + } + + /** + * Kills YARN application and stops YARN client. + * + * Use this method to kill the App before it has been properly deployed + */ + private void failSessionDuringDeployment() { + LOG.info("Killing YARN application"); + + try { + yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId()); + } catch (Exception e) { + // we only log a debug message here because the "killApplication" call is a best-effort + // call (we don't know if the application has been deployed when the error occured). + LOG.debug("Error while killing YARN application", e); + } + yarnClient.stop(); + } + + + private static class ClusterResourceDescription { + final public int totalFreeMemory; + final public int containerLimit; + final public int[] nodeManagersFree; + + public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) { + this.totalFreeMemory = totalFreeMemory; + this.containerLimit = containerLimit; + this.nodeManagersFree = nodeManagersFree; + } + } + + private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException { + List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING); + + int totalFreeMemory = 0; + int containerLimit = 0; + int[] nodeManagersFree = new int[nodes.size()]; + + for(int i = 0; i < nodes.size(); i++) { + NodeReport rep = nodes.get(i); + int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 ); + nodeManagersFree[i] = free; + totalFreeMemory += free; + if(free > containerLimit) { + containerLimit = free; + } + } + return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree); + } + + + + public String getClusterDescription() throws Exception { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + + YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics(); + + ps.append("NodeManagers in the Cluster " + metrics.getNumNodeManagers()); + List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING); + final String format = "|%-16s |%-16s %n"; + ps.printf("|Property |Value %n"); + ps.println("+---------------------------------------+"); + int totalMemory = 0; + int totalCores = 0; + for(NodeReport rep : nodes) { + final Resource res = rep.getCapability(); + totalMemory += res.getMemory(); + totalCores += res.getVirtualCores(); + ps.format(format, "NodeID", rep.getNodeId()); + ps.format(format, "Memory", res.getMemory()+" MB"); + ps.format(format, "vCores", res.getVirtualCores()); + ps.format(format, "HealthReport", rep.getHealthReport()); + ps.format(format, "Containers", rep.getNumContainers()); + ps.println("+---------------------------------------+"); + } + ps.println("Summary: totalMemory "+totalMemory+" totalCores "+totalCores); + List<QueueInfo> qInfo = yarnClient.getAllQueues(); + for(QueueInfo q : qInfo) { + ps.println("Queue: "+q.getQueueName()+", Current Capacity: "+q.getCurrentCapacity()+" Max Capacity: "+q.getMaximumCapacity()+" Applications: "+q.getApplications().size()); + } + yarnClient.stop(); + return baos.toString(); + } + + public static class YarnDeploymentException extends RuntimeException { + public YarnDeploymentException() { + } + + public YarnDeploymentException(String message) { + super(message); + } + + public YarnDeploymentException(String message, Throwable cause) { + super(message, cause); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java new file mode 100644 index 0000000..98abd5e --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; + +import static akka.pattern.Patterns.ask; + +import akka.actor.Props; +import akka.util.Timeout; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.net.NetUtils; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; +import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.None$; +import scala.Some; +import scala.concurrent.Await; +import scala.concurrent.Awaitable; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + + +public class FlinkYarnCluster extends AbstractFlinkYarnCluster { + private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnCluster.class); + + private static final int POLLING_THREAD_INTERVAL_MS = 1000; + + private YarnClient yarnClient; + private Thread actorRunner; + private Thread clientShutdownHook = new ClientShutdownHook(); + private PollingThread pollingRunner; + private Configuration hadoopConfig; + // (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown. + private Path sessionFilesDir; + private InetSocketAddress jobManagerAddress; + + //---------- Class internal fields ------------------- + + private ActorSystem actorSystem; + private ActorRef applicationClient; + private ApplicationReport intialAppReport; + private static FiniteDuration akkaDuration = Duration.apply(5, TimeUnit.SECONDS); + private static Timeout akkaTimeout = Timeout.durationToTimeout(akkaDuration); + + public FlinkYarnCluster(final YarnClient yarnClient, final ApplicationId appId, + Configuration hadoopConfig, Path sessionFilesDir) throws IOException, YarnException { + this.yarnClient = yarnClient; + this.hadoopConfig = hadoopConfig; + this.sessionFilesDir = sessionFilesDir; + + // get one application report manually + intialAppReport = yarnClient.getApplicationReport(appId); + String jobManagerHost = intialAppReport.getHost(); + int jobManagerPort = intialAppReport.getRpcPort(); + this.jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort); + + // start actor system + LOG.info("Start actor system."); + InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM + actorSystem = YarnUtils.createActorSystem(ownHostname.getCanonicalHostName(), 0, GlobalConfiguration.getConfiguration()); // set port automatically. + + // start application client + LOG.info("Start application client."); + + applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class)); + + // instruct ApplicationClient to start a periodical status polling + applicationClient.tell(new Messages.LocalRegisterClient(jobManagerHost + ":" + jobManagerPort), applicationClient); + + + // add hook to ensure proper shutdown + Runtime.getRuntime().addShutdownHook(clientShutdownHook); + + actorRunner = new Thread(new Runnable() { + @Override + public void run() { + // blocks until ApplicationMaster has been stopped + actorSystem.awaitTermination(); + + // get final application report + try { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + + LOG.info("Application " + appId + " finished with state " + appReport + .getYarnApplicationState() + " and final state " + appReport + .getFinalApplicationStatus() + " at " + appReport.getFinishTime()); + + if(appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState() + == YarnApplicationState.KILLED ) { + LOG.warn("Application failed. Diagnostics "+appReport.getDiagnostics()); + LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve " + + "the full application log using this command:\n" + + "\tyarn logs -applicationId "+appReport.getApplicationId()+"\n" + + "(It sometimes takes a few seconds until the logs are aggregated)"); + } + } catch(Exception e) { + LOG.warn("Error while getting final application report", e); + } + } + }); + actorRunner.setDaemon(true); + actorRunner.start(); + + pollingRunner = new PollingThread(yarnClient, appId); + pollingRunner.setDaemon(true); + pollingRunner.start(); + } + + // -------------------------- Interaction with the cluster ------------------------ + + @Override + public InetSocketAddress getJobManagerAddress() { + return jobManagerAddress; + } + + @Override + public String getWebInterfaceURL() { + return this.intialAppReport.getTrackingUrl(); + } + + + @Override + public FlinkYarnClusterStatus getClusterStatus() { + if(hasBeenStopped()) { + throw new RuntimeException("The FlinkYarnCluster has alread been stopped"); + } + Future<Object> clusterStatusOption = ask(applicationClient, Messages.LocalGetYarnClusterStatus$.MODULE$, akkaTimeout); + Object clusterStatus = awaitUtil(clusterStatusOption, "Unable to get Cluster status from Application Client"); + if(clusterStatus instanceof None$) { + return null; + } else if(clusterStatus instanceof Some) { + return (FlinkYarnClusterStatus) (((Some) clusterStatus).get()); + } else { + throw new RuntimeException("Unexpected type: "+clusterStatus.getClass().getCanonicalName()); + } + } + + @Override + public boolean hasFailed() { + if(pollingRunner == null) { + LOG.warn("FlinkYarnCluster.hasFailed() has been called on an uninitialized cluster." + + "The system might be in an erroneous state"); + } + ApplicationReport lastReport = pollingRunner.getLastReport(); + if(lastReport == null) { + LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster. that didn't receive a status so far." + + "The system might be in an erroneous state"); + return false; + } else { + return (lastReport.getYarnApplicationState() == YarnApplicationState.FAILED || + lastReport.getYarnApplicationState() == YarnApplicationState.KILLED); + } + } + + @Override + public String getDiagnostics() { + if (!hasFailed()) { + LOG.warn("getDiagnostics() called for cluster which is not in failed state"); + } + ApplicationReport lastReport = pollingRunner.getLastReport(); + if (lastReport == null) { + LOG.warn("Last report is null"); + return null; + } else { + return lastReport.getDiagnostics(); + } + } + + @Override + public List<String> getNewMessages() { + if(hasBeenStopped()) { + throw new RuntimeException("The FlinkYarnCluster has alread been stopped"); + } + List<String> ret = new ArrayList<String>(); + // get messages from ApplicationClient (locally) + while(true) { + Future<Object> messageOptionFuture = ask(applicationClient, Messages.LocalGetYarnMessage$.MODULE$, akkaTimeout); + Object messageOption = awaitUtil(messageOptionFuture, "Error getting new messages from Appliation Client"); + if(messageOption instanceof None$) { + break; + } else if(messageOption instanceof org.apache.flink.yarn.Messages.YarnMessage) { + Messages.YarnMessage msg = (Messages.YarnMessage) messageOption; + ret.add("["+msg.date()+"] "+msg.message()); + } else { + LOG.warn("LocalGetYarnMessage returned unexpected type: "+messageOption); + } + } + return ret; + } + + private static <T> T awaitUtil(Awaitable<T> awaitable, String message) { + try { + return Await.result(awaitable, akkaDuration); + } catch (Exception e) { + throw new RuntimeException(message, e); + } + } + + // -------------------------- Shutdown handling ------------------------ + + private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false); + @Override + public void shutdown() { + shutdownInternal(true); + } + + private void shutdownInternal(boolean removeShutdownHook) { + if(hasBeenShutDown.getAndSet(true)) { + return; + } + // the session is being stopped explicitly. + if(removeShutdownHook) { + Runtime.getRuntime().removeShutdownHook(clientShutdownHook); + } + if(actorSystem != null){ + LOG.info("Sending shutdown request to the Application Master"); + if(applicationClient != ActorRef.noSender()) { + Future<Object> future = ask(applicationClient, new Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED), akkaTimeout); + awaitUtil(future, "Error while stopping YARN Application Client"); + } + + actorSystem.shutdown(); + actorSystem.awaitTermination(); + + actorSystem = null; + } + + LOG.info("Deleting files in "+sessionFilesDir ); + try { + FileSystem shutFS = FileSystem.get(hadoopConfig); + shutFS.delete(sessionFilesDir, true); // delete conf and jar file. + shutFS.close(); + }catch(IOException e){ + LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e); + } + + try { + actorRunner.join(1000); // wait for 1 second + } catch (InterruptedException e) { + LOG.warn("Shutdown of the actor runner was interrupted", e); + Thread.currentThread().interrupt(); + } + try { + pollingRunner.stopRunner(); + pollingRunner.join(1000); + } catch(InterruptedException e) { + LOG.warn("Shutdown of the polling runner was interrupted", e); + Thread.currentThread().interrupt(); + } + + LOG.info("YARN Client is shutting down"); + yarnClient.stop(); // actorRunner is using the yarnClient. + yarnClient = null; // set null to clearly see if somebody wants to access it afterwards. + } + + @Override + public boolean hasBeenStopped() { + return hasBeenShutDown.get(); + } + + + public class ClientShutdownHook extends Thread { + @Override + public void run() { + shutdownInternal(false); + } + } + + // -------------------------- Polling ------------------------ + + public static class PollingThread extends Thread { + + AtomicBoolean running = new AtomicBoolean(true); + private YarnClient yarnClient; + private ApplicationId appId; + + // ------- status information stored in the polling thread + private Object lock = new Object(); + private ApplicationReport lastReport; + + + public PollingThread(YarnClient yarnClient, ApplicationId appId) { + this.yarnClient = yarnClient; + this.appId = appId; + } + + public void stopRunner() { + if(!running.get()) { + LOG.warn("Polling thread was already stopped"); + } + running.set(false); + } + + public ApplicationReport getLastReport() { + synchronized (lock) { + return lastReport; + } + } + + @Override + public void run() { + while (running.get() && yarnClient.isInState(Service.STATE.STARTED)) { + try { + ApplicationReport report = yarnClient.getApplicationReport(appId); + synchronized (lock) { + lastReport = report; + } + } catch (Exception e) { + LOG.warn("Error while getting application report", e); + // TODO: do more here. + } + try { + Thread.sleep(FlinkYarnCluster.POLLING_THREAD_INTERVAL_MS); + } catch (InterruptedException e) { + LOG.error("Polling thread got interrupted", e); + Thread.currentThread().interrupt(); // pass interrupt. + } + } + if(running.get() && !yarnClient.isInState(Service.STATE.STARTED)) { + // == if the polling thread is still running but the yarn client is stopped. + LOG.warn("YARN client is unexpected in state "+yarnClient.getServiceState()); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java new file mode 100644 index 0000000..8bb2668 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringInterner; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ConverterUtils; + +public class Utils { + + private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + + private static final int DEFAULT_HEAP_LIMIT_CAP = 500; + private static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.8f; + + /** + * Calculate the heap size for the JVMs to start in the containers. + * Since JVMs are allocating more than just the heap space, and YARN is very + * fast at killing processes that use memory beyond their limit, we have to come + * up with a good heapsize. + * This code takes 85% of the given amount of memory (in MB). If the amount we removed by these 85% + * more than 500MB (the current HEAP_LIMIT_CAP), we'll just subtract 500 MB. + * + */ + public static int calculateHeapSize(int memory) { + float memoryCutoffRatio = GlobalConfiguration.getFloat(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, DEFAULT_YARN_HEAP_CUTOFF_RATIO); + int heapLimitCap = GlobalConfiguration.getInteger(ConfigConstants.YARN_HEAP_LIMIT_CAP, DEFAULT_HEAP_LIMIT_CAP); + + int heapLimit = (int)((float)memory * memoryCutoffRatio); + if( (memory - heapLimit) > heapLimitCap) { + heapLimit = memory-heapLimitCap; + } + return heapLimit; + } + + private static void addPathToConfig(Configuration conf, File path) { + // chain-in a new classloader + URL fileUrl = null; + try { + fileUrl = path.toURI().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException("Erroneous config file path", e); + } + URL[] urls = {fileUrl}; + ClassLoader cl = new URLClassLoader(urls, conf.getClassLoader()); + conf.setClassLoader(cl); + } + + private static void setDefaultConfValues(Configuration conf) { + if(conf.get("fs.hdfs.impl",null) == null) { + conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); + } + if(conf.get("fs.file.impl",null) == null) { + conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); + } + } + + public static Configuration initializeYarnConfiguration() { + Configuration conf = new YarnConfiguration(); + String configuredHadoopConfig = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + if(configuredHadoopConfig != null) { + LOG.info("Using hadoop configuration path from " + ConfigConstants.PATH_HADOOP_CONFIG + " setting."); + addPathToConfig(conf, new File(configuredHadoopConfig)); + setDefaultConfValues(conf); + return conf; + } + String[] envs = { "YARN_CONF_DIR", "HADOOP_CONF_DIR", "HADOOP_CONF_PATH" }; + for(int i = 0; i < envs.length; ++i) { + String confPath = System.getenv(envs[i]); + if (confPath != null) { + LOG.info("Found "+envs[i]+", adding it to configuration"); + addPathToConfig(conf, new File(confPath)); + setDefaultConfValues(conf); + return conf; + } + } + LOG.info("Could not find HADOOP_CONF_PATH, using HADOOP_HOME."); + String hadoopHome = null; + try { + hadoopHome = Shell.getHadoopHome(); + } catch (IOException e) { + throw new RuntimeException("Unable to get hadoop home. Please set HADOOP_HOME variable!", e); + } + File tryConf = new File(hadoopHome+"/etc/hadoop"); + if(tryConf.exists()) { + LOG.info("Found configuration using hadoop home."); + addPathToConfig(conf, tryConf); + } else { + tryConf = new File(hadoopHome+"/conf"); + if(tryConf.exists()) { + addPathToConfig(conf, tryConf); + } + } + setDefaultConfValues(conf); + return conf; + } + + public static void setupEnv(Configuration conf, Map<String, String> appMasterEnv) { + addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*"); + for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim()); + } + } + + + /** + * + * @return Path to remote file (usually hdfs) + * @throws IOException + */ + public static Path setupLocalResource(Configuration conf, FileSystem fs, String appId, Path localRsrcPath, LocalResource appMasterJar, Path homedir) + throws IOException { + // copy to HDFS + String suffix = ".flink/" + appId + "/" + localRsrcPath.getName(); + + Path dst = new Path(homedir, suffix); + + LOG.info("Copying from "+localRsrcPath+" to "+dst ); + fs.copyFromLocalFile(localRsrcPath, dst); + registerLocalResource(fs, dst, appMasterJar); + return dst; + } + + public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException { + FileStatus jarStat = fs.getFileStatus(remoteRsrcPath); + localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri())); + localResource.setSize(jarStat.getLen()); + localResource.setTimestamp(jarStat.getModificationTime()); + localResource.setType(LocalResourceType.FILE); + localResource.setVisibility(LocalResourceVisibility.APPLICATION); + } + + public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException { + Credentials credentials = new Credentials(); + // for HDFS + TokenCache.obtainTokensForNamenodes(credentials, paths, conf); + // for user + UserGroupInformation currUsr = UserGroupInformation.getCurrentUser(); + + Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens(); + for(Token<? extends TokenIdentifier> token : usrTok) { + final Text id = new Text(token.getIdentifier()); + LOG.info("Adding user token "+id+" with "+token); + credentials.addToken(id, token); + } + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + + if(LOG.isDebugEnabled()) { + LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength()); + } + + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + amContainer.setTokens(securityTokens); + } + + public static void logFilesInCurrentDirectory(final Logger logger) { + new File(".").list(new FilenameFilter() { + + @Override + public boolean accept(File dir, String name) { + logger.info(dir.getAbsolutePath()+"/"+name); + return true; + } + }); + } + + /** + * Copied method from org.apache.hadoop.yarn.util.Apps + * It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1 + * by https://issues.apache.org/jira/browse/YARN-1931 + */ + public static void addToEnvironment(Map<String, String> environment, + String variable, String value) { + String val = environment.get(variable); + if (val == null) { + val = value; + } else { + val = val + File.pathSeparator + value; + } + environment.put(StringInterner.weakIntern(variable), + StringInterner.weakIntern(val)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java new file mode 100644 index 0000000..3f1cc23 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.appMaster; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Map; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.apache.flink.yarn.YarnUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.yarn.FlinkYarnClient; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import scala.Tuple2; + + +public class YarnTaskManagerRunner { + + private static final Logger LOG = LoggerFactory.getLogger(YarnTaskManagerRunner.class); + + public static void main(final String[] args) throws IOException { + Map<String, String> envs = System.getenv(); + final String yarnClientUsername = envs.get(FlinkYarnClient.ENV_CLIENT_USERNAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + + // configure local directory + final String[] newArgs = Arrays.copyOf(args, args.length + 2); + newArgs[newArgs.length-2] = "--tempDir"; + newArgs[newArgs.length-1] = localDirs; + LOG.info("Setting log path "+localDirs); + LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting" + + " user to execute Flink TaskManager to '"+yarnClientUsername+"'"); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername); + for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) { + ugi.addToken(toks); + } + ugi.doAs(new PrivilegedAction<Object>() { + @Override + public Object run() { + try { + Tuple2<ActorSystem, ActorRef> tuple = YarnUtils + .startActorSystemAndTaskManager(newArgs); + + tuple._1().awaitTermination(); + } catch (Exception e) { + LOG.error("Error while running the TaskManager", e); + } + return null; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala new file mode 100644 index 0000000..22f4c02 --- /dev/null +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn + +import java.util.concurrent.TimeUnit + +import akka.actor._ +import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration} +import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.jobmanager.JobManager +import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus +import org.apache.flink.yarn.Messages._ +import scala.collection.mutable +import scala.concurrent.duration._ + +class ApplicationClient + + extends Actor with ActorLogMessages with ActorLogging { + import context._ + + val INITIAL_POLLING_DELAY = 0 seconds + val WAIT_FOR_YARN_INTERVAL = 2 seconds + val POLLING_INTERVAL = 3 seconds + + var yarnJobManager: Option[ActorRef] = None + var pollingTimer: Option[Cancellable] = None + implicit var timeout: FiniteDuration = 0 seconds + var running = false + var messagesQueue : mutable.Queue[YarnMessage] = mutable.Queue[YarnMessage]() + var latestClusterStatus : Option[FlinkYarnClusterStatus] = None + var stopMessageReceiver : Option[ActorRef] = None + + override def preStart(): Unit = { + super.preStart() + + timeout = new FiniteDuration(GlobalConfiguration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, + ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS) + } + + override def postStop(): Unit = { + log.info("Stopped Application client.") + pollingTimer foreach { + _.cancel() + } + + pollingTimer = None + } + + override def receiveWithLogMessages: Receive = { + // ----------------------------- Registration -> Status updates -> shutdown ---------------- + case LocalRegisterClient(address: String) => { + val jmAkkaUrl = JobManager.getRemoteAkkaURL(address) + + yarnJobManager = Some(AkkaUtils.getReference(jmAkkaUrl)(system, timeout)) + yarnJobManager match { + case Some(jm) => { + // the message came from the FlinkYarnCluster. We send the message to the JobManager. + // it is important not to forward the message because the JobManager is storing the + // sender as the Application Client (this class). + jm ! RegisterClient + + // schedule a periodic status report from the JobManager + // request the number of task managers and slots from the job manager + pollingTimer = Some(context.system.scheduler.schedule(INITIAL_POLLING_DELAY, + WAIT_FOR_YARN_INTERVAL, yarnJobManager.get, PollYarnClusterStatus)) + } + case None => throw new RuntimeException("Registration at JobManager/ApplicationMaster " + + "failed. Job Manager RPC connection has not properly been initialized"); + } + } + case msg: StopYarnSession => { + log.info("Stop yarn session.") + stopMessageReceiver = Some(sender()) + yarnJobManager foreach { + _ forward msg + } + } + case JobManagerStopped => { + log.info("Remote JobManager has been stopped successfully. " + + "Stopping local application client") + stopMessageReceiver foreach { + _ ! JobManagerStopped + } + // stop ourselves + context.system.shutdown() + } + + // handle the responses from the PollYarnClusterStatus messages to the yarn job mgr + case status: FlinkYarnClusterStatus => { + latestClusterStatus = Some(status) + } + + // locally get cluster status + case LocalGetYarnClusterStatus => { + sender() ! latestClusterStatus + } + + // ----------------- handle messages from the cluster ------------------- + // receive remote messages + case msg: YarnMessage => { + messagesQueue.enqueue(msg) + } + // locally forward messages + case LocalGetYarnMessage => { + sender() ! (if( messagesQueue.size == 0) None else messagesQueue.dequeue) + } + case _ => + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala new file mode 100644 index 0000000..fd67b01 --- /dev/null +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn + +import java.io.{PrintWriter, FileWriter, BufferedWriter} +import java.security.PrivilegedAction + +import akka.actor._ +import org.apache.flink.client.CliFrontend +import org.apache.flink.configuration.{GlobalConfiguration, ConfigConstants} +import org.apache.flink.runtime.jobmanager.{WithWebServer, JobManager} +import org.apache.flink.yarn.Messages.StartYarnSession +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment +import org.slf4j.LoggerFactory + +import scala.io.Source + +object ApplicationMaster { + import scala.collection.JavaConversions._ + + val LOG = LoggerFactory.getLogger(this.getClass) + + val CONF_FILE = "flink-conf.yaml" + val MODIFIED_CONF_FILE = "flink-conf-modified.yaml" + + def main(args: Array[String]): Unit ={ + val yarnClientUsername = System.getenv(FlinkYarnClient.ENV_CLIENT_USERNAME) + LOG.info(s"YARN daemon runs as ${UserGroupInformation.getCurrentUser.getShortUserName}" + + s" setting user to execute Flink ApplicationMaster/JobManager to ${yarnClientUsername}") + + val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername) + + for(token <- UserGroupInformation.getCurrentUser.getTokens){ + ugi.addToken(token) + } + + ugi.doAs(new PrivilegedAction[Object] { + override def run(): Object = { + var actorSystem: ActorSystem = null + var jobManager: ActorRef = ActorRef.noSender + + try { + val conf = Utils.initializeYarnConfiguration() + + val env = System.getenv() + + if(LOG.isDebugEnabled) { + LOG.debug("All environment variables: " + env.toString) + } + + val currDir = env.get(Environment.PWD.key()) + require(currDir != null, "Current directory unknown.") + + val logDirs = env.get(Environment.LOG_DIRS.key()) + + // Note that we use the "ownHostname" given by YARN here, to make sure + // we use the hostnames given by YARN consitently throuout akka. + // for akka "localhost" and "localhost.localdomain" are different actors. + val ownHostname = env.get(Environment.NM_HOST.key()) + require(ownHostname != null, s"Own hostname not set.") + + val taskManagerCount = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt + val slots = env.get(FlinkYarnClient.ENV_SLOTS).toInt + val dynamicPropertiesEncodedString = env.get(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES) + + val jobManagerWebPort = 0 // automatic assignment. + + val (system, actor) = startJobManager(currDir, ownHostname,dynamicPropertiesEncodedString) + + actorSystem = system + jobManager = actor + val extActor = system.asInstanceOf[ExtendedActorSystem] + val jobManagerPort = extActor.provider.getDefaultAddress.port.get + + // generate configuration file for TaskManagers + generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, ownHostname, + jobManagerPort, jobManagerWebPort, logDirs, slots, taskManagerCount, + dynamicPropertiesEncodedString) + + + // send "start yarn session" message to YarnJobManager. + LOG.info("Start yarn session on job manager.") + jobManager ! StartYarnSession(conf, jobManagerPort) + + LOG.info("Application Master properly initiated. Await termination of actor system.") + actorSystem.awaitTermination() + }catch{ + case t: Throwable => + LOG.error("Error while running the application master.", t) + + if(actorSystem != null){ + actorSystem.shutdown() + actorSystem.awaitTermination() + + actorSystem = null + } + } + + null + } + }) + + } + + def generateConfigurationFile(fileName: String, currDir: String, ownHostname: String, + jobManagerPort: Int, + jobManagerWebPort: Int, logDirs: String, slots: Int, + taskManagerCount: Int, dynamicPropertiesEncodedString: String) + : Unit = { + LOG.info("Generate configuration file for application master.") + val output = new PrintWriter(new BufferedWriter( + new FileWriter(fileName)) + ) + + for (line <- Source.fromFile(s"$currDir/$CONF_FILE").getLines() if !(line.contains + (ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY))) { + output.println(line) + } + + output.println(s"${ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY}: $ownHostname") + output.println(s"${ConfigConstants.JOB_MANAGER_IPC_PORT_KEY}: $jobManagerPort") + + output.println(s"${ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY}: $logDirs") + output.println(s"${ConfigConstants.JOB_MANAGER_WEB_PORT_KEY}: $jobManagerWebPort") + + + if(slots != -1){ + output.println(s"${ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS}: $slots") + output.println( + s"${ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY}: ${slots*taskManagerCount}") + } + + // add dynamic properties + val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString) + + import scala.collection.JavaConverters._ + + for(property <- dynamicProperties.asScala){ + output.println(s"${property.f0}: ${property.f1}") + } + + output.close() + } + + def startJobManager(currDir: String, hostname: String, dynamicPropertiesEncodedString: String): + (ActorSystem, ActorRef) = { + LOG.info("Start job manager for yarn") + val args = Array[String]("--configDir", currDir) + + LOG.info(s"Config path: ${currDir}.") + val (_, _, configuration, _) = JobManager.parseArgs(args) + + // add dynamic properties to JobManager configuration. + val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString) + import scala.collection.JavaConverters._ + for(property <- dynamicProperties.asScala){ + configuration.setString(property.f0, property.f1) + } + GlobalConfiguration.getConfiguration.addAll(configuration) // make part of globalConf. + + // set port to 0 to let Akka automatically determine the port. + implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port = 0, configuration) + + LOG.info("Start job manager actor.") + (jobManagerSystem, JobManager.startActor(Props(new JobManager(configuration) with + WithWebServer with YarnJobManager))) + } +}