Repository: flink Updated Branches: refs/heads/master 2522f028b -> 84e76f4d3
http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java new file mode 100644 index 0000000..5976799 --- /dev/null +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; +import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + + +/** + * This test starts a MiniYARNCluster with a FIFO scheudler. + * There are no queues for that scheduler. + */ +public class YARNSessionFIFOITCase extends YarnTestBase { + private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class); + + /* + Override init with FIFO scheduler. + */ + @BeforeClass + public static void setup() { + yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); + yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768); + yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + startYARNWithConfig(yarnConfiguration); + } + /** + * Test regular operation, including command line parameter parsing. + */ + @Test + public void testClientStartup() { + LOG.info("Starting testClientStartup()"); + runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), + "-n", "1", + "-jm", "512", + "-tm", "1024"}, + "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION); + LOG.info("Finished testClientStartup()"); + ensureNoExceptionsInLogFiles(); + } + + + /** + * Test querying the YARN cluster. + * + * This test validates through 666*2 cores in the "cluster". + */ + @Test + public void testQueryCluster() { + LOG.info("Starting testQueryCluster()"); + runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores. + LOG.info("Finished testQueryCluster()"); + ensureNoExceptionsInLogFiles(); + } + + /** + * Test deployment to non-existing queue. (user-reported error) + * Deployment to the queue is possible because there are no queues, so we don't check. + */ + @Test + public void testNonexistingQueue() { + LOG.info("Starting testNonexistingQueue()"); + runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), + "-n", "1", + "-jm", "512", + "-tm", "1024", + "-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION); + LOG.info("Finished testNonexistingQueue()"); + ensureNoExceptionsInLogFiles(); + } + + /** + * Test requesting more resources than available. + */ + @Test + public void testMoreNodesThanAvailable() { + if(ignoreOnTravis()) { + return; + } + addTestAppender(); + LOG.info("Starting testMoreNodesThanAvailable()"); + runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), + "-n", "10", + "-jm", "512", + "-tm", "1024"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION); // the number of TMs depends on the speed of the test hardware + LOG.info("Finished testMoreNodesThanAvailable()"); + checkForLogString("This YARN session requires 10752MB of memory in the cluster. There are currently only 8192MB available."); + ensureNoExceptionsInLogFiles(); + } + + /** + * The test cluster has the following resources: + * - 2 Nodes with 4096 MB each. + * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512 + * + * We allocate: + * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb) + * 5 TaskManagers with 1585 MB + * + * user sees a total request of: 8181 MB (fits) + * system sees a total request of: 8437 (doesn't fit due to min alloc mb) + */ + @Test + public void testResourceComputation() { + if(ignoreOnTravis()) { + return; + } + addTestAppender(); + LOG.info("Starting testResourceComputation()"); + runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), + "-n", "5", + "-jm", "256", + "-tm", "1585"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION); + LOG.info("Finished testResourceComputation()"); + checkForLogString("This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available."); + } + + /** + * The test cluster has the following resources: + * - 2 Nodes with 4096 MB each. + * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512 + * + * We allocate: + * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb) + * 2 TaskManagers with 3840 MB + * + * the user sees a total request of: 7936 MB (fits) + * the system sees a request of: 8192 MB (fits) + * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit. + * + * --> check if the system properly rejects allocating this session. + */ + @Test + public void testfullAlloc() { + if(ignoreOnTravis()) { + return; + } + addTestAppender(); + LOG.info("Starting testfullAlloc()"); + runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), + "-n", "2", + "-jm", "256", + "-tm", "3840"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION); + LOG.info("Finished testfullAlloc()"); + checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" + + "After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]"); + ensureNoExceptionsInLogFiles(); + } + + /** + * Test per-job yarn cluster + * + * This also tests the prefixed CliFrontend options for the YARN case + */ + @Test + public void perJobYarnCluster() { + LOG.info("Starting perJobYarnCluster()"); + File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName("-WordCount.jar", "streaming")); // exclude streaming wordcount here. + Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation); + runWithArgs(new String[] {"run", "-m", "yarn-cluster", + "-yj", flinkUberjar.getAbsolutePath(), + "-yn", "1", + "-yjm", "512", + "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "Job execution switched to status FINISHED.", RunTypes.CLI_FRONTEND); + LOG.info("Finished perJobYarnCluster()"); + ensureNoExceptionsInLogFiles(); + } + + /** + * Test the YARN Java API + */ + @Test + public void testJavaAPI() { + final int WAIT_TIME = 15; + LOG.info("Starting testJavaAPI()"); + + AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient(); + flinkYarnClient.setTaskManagerCount(1); + flinkYarnClient.setJobManagerMemory(512); + flinkYarnClient.setTaskManagerMemory(512); + flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); + String confDirPath = System.getenv("FLINK_CONF_DIR"); + flinkYarnClient.setConfigurationDirectory(confDirPath); + flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration()); + flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); + + // deploy + AbstractFlinkYarnCluster yarnCluster = null; + try { + yarnCluster = flinkYarnClient.deploy(null); + } catch (Exception e) { + System.err.println("Error while deploying YARN cluster: "+e.getMessage()); + e.printStackTrace(System.err); + Assert.fail(); + } + FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1); + for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever" + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted", e); + Thread.interrupted(); + } + FlinkYarnClusterStatus status = yarnCluster.getClusterStatus(); + if(status != null && status.equals(expectedStatus)) { + LOG.info("Cluster reached status " + status); + break; // all good, cluster started + } + if(second > WAIT_TIME) { + // we waited for 15 seconds. cluster didn't come up correctly + Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds"); + } + } + + // use the cluster + Assert.assertNotNull(yarnCluster.getJobManagerAddress()); + Assert.assertNotNull(yarnCluster.getWebInterfaceURL()); + + LOG.info("Shutting down cluster. All tests passed"); + // shutdown cluster + yarnCluster.shutdown(); + LOG.info("Finished testJavaAPI()"); + + ensureNoExceptionsInLogFiles(); + } + + public boolean ignoreOnTravis() { + if(System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true")) { + // we skip the test until we are able to start a smaller yarn clsuter + // right now, the miniyarncluster has the size of the nodemanagers fixed on 4 GBs. + LOG.warn("Skipping test on travis for now"); + return true; + } + return false; + } + + // + // --------------- Tools to test if a certain string has been logged with Log4j. ------------- + // See : http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j + // + private static TestAppender testAppender; + public static void addTestAppender() { + testAppender = new TestAppender(); + org.apache.log4j.Logger.getRootLogger().addAppender(testAppender); + } + + public static void checkForLogString(String expected) { + if(testAppender == null) { + throw new NullPointerException("Initialize it first"); + } + LoggingEvent found = null; + for(LoggingEvent event: testAppender.events) { + if(event.getMessage().toString().contains(expected)) { + found = event; + break; + } + } + if(found != null) { + LOG.info("Found expected string '"+expected+"' in log message "+found); + return; + } + Assert.fail("Unable to find expected string '"+expected+"' in log messages"); + } + + public static class TestAppender extends AppenderSkeleton { + public List<LoggingEvent> events = new ArrayList<LoggingEvent>(); + public void close() {} + public boolean requiresLayout() {return false;} + @Override + protected void append(LoggingEvent event) { + events.add(event); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java new file mode 100644 index 0000000..200205d --- /dev/null +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java @@ -0,0 +1,435 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.PrintStream; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Scanner; + + +/** + * This base class allows to use the MiniYARNCluster. + * The cluster is re-used for all tests. + * + * This class is located in a different package which is build after flink-dist. This way, + * we can use the YARN uberjar of flink to start a Flink YARN session. + */ +public abstract class YarnTestBase { + private static final Logger LOG = LoggerFactory.getLogger(YarnTestBase.class); + + private final static PrintStream originalStdout = System.out; + private final static PrintStream originalStderr = System.err; + + private final static String TEST_CLUSTER_NAME = "flink-yarn-tests"; + + + // Temp directory which is deleted after the unit test. + private static TemporaryFolder tmp = new TemporaryFolder(); + + protected static MiniYARNCluster yarnCluster = null; + + protected static File flinkUberjar; + private static File yarnConfFile; + + protected static final Configuration yarnConfiguration; + static { + yarnConfiguration = new YarnConfiguration(); + yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways + yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); + yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2); + yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); + yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster. + // so we have to change the number of cores for testing. + } + + // This code is taken from: http://stackoverflow.com/a/7201825/568695 + // it changes the environment variables of this JVM. Use only for testing purposes! + @SuppressWarnings("unchecked") + private static void setEnv(Map<String, String> newenv) { + try { + Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); + Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment"); + theEnvironmentField.setAccessible(true); + Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null); + env.putAll(newenv); + Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); + theCaseInsensitiveEnvironmentField.setAccessible(true); + Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null); + cienv.putAll(newenv); + } catch (NoSuchFieldException e) { + try { + Class[] classes = Collections.class.getDeclaredClasses(); + Map<String, String> env = System.getenv(); + for (Class cl : classes) { + if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Object obj = field.get(env); + Map<String, String> map = (Map<String, String>) obj; + map.clear(); + map.putAll(newenv); + } + } + } catch (Exception e2) { + throw new RuntimeException(e2); + } + } catch (Exception e1) { + throw new RuntimeException(e1); + } + } + + /** + * Sleep a bit between the tests (we are re-using the YARN cluster for the tests) + */ + @After + public void sleep() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Assert.fail("Should not happen"); + } + } + + @Before + public void checkClusterEmpty() throws IOException, YarnException { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(yarnConfiguration); + yarnClient.start(); + List<ApplicationReport> apps = yarnClient.getApplications(); + for(ApplicationReport app : apps) { + if(app.getYarnApplicationState() != YarnApplicationState.FINISHED) { + Assert.fail("There is at least one application on the cluster is not finished"); + } + } + } + + /** + * Locate a file or diretory directory + */ + public static File findFile(String startAt, FilenameFilter fnf) { + File root = new File(startAt); + String[] files = root.list(); + if(files == null) { + return null; + } + for(String file : files) { + + File f = new File(startAt + File.separator + file); + if(f.isDirectory()) { + File r = findFile(f.getAbsolutePath(), fnf); + if(r != null) { + return r; + } + } else if (fnf.accept(f.getParentFile(), f.getName())) { + return f; + } + + } + return null; + } + + /** + * Filter to find root dir of the flink-yarn dist. + */ + public static class RootDirFilenameFilter implements FilenameFilter { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("flink-dist") && name.endsWith(".jar") && dir.toString().contains("/lib"); + } + } + public static class ContainsName implements FilenameFilter { + private String name; + private String excludeInPath = null; + + public ContainsName(String name) { + this.name = name; + } + + public ContainsName(String name, String excludeInPath) { + this.name = name; + this.excludeInPath = excludeInPath; + } + + @Override + public boolean accept(File dir, String name) { + if(excludeInPath == null) { + return name.contains(this.name); + } else { + return name.contains(this.name) && !dir.toString().contains(excludeInPath); + } + } + } + + public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException { + tmp.create(); + File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml"); + + FileWriter writer = new FileWriter(yarnSiteXML); + yarnConf.writeXml(writer); + writer.flush(); + writer.close(); + return yarnSiteXML; + } + + /** + * This method checks the written TaskManager and JobManager log files + * for exceptions. + */ + public static void ensureNoExceptionsInLogFiles() { + File cwd = new File("target/"+TEST_CLUSTER_NAME); + Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to exist", cwd.exists()); + Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory()); + System.out.println("cwd = "+cwd.getAbsolutePath()); + File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + File f = new File(dir.getAbsolutePath()+ "/" + name); + // scan each file for 'Exception'. + Scanner scanner = null; + try { + scanner = new Scanner(f); + } catch (FileNotFoundException e) { + LOG.warn("Unable to locate file: "+e.getMessage()+" file: "+f.getAbsolutePath()); + } + while (scanner.hasNextLine()) { + final String lineFromFile = scanner.nextLine(); + if(lineFromFile.contains("Exception")) { + return true; + } + } + return false; + } + }); + if(foundFile != null) { + Scanner scanner = null; + try { + scanner = new Scanner(foundFile); + } catch (FileNotFoundException e) { + Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+foundFile.getAbsolutePath()); + } + LOG.warn("Found a file with an exception. Printing contents:"); + while (scanner.hasNextLine()) { + LOG.warn("LINE: "+scanner.nextLine()); + } + Assert.fail("Found a file "+foundFile+" with an exception"); + } + } + + public static void startYARNWithConfig(Configuration conf) { + flinkUberjar = findFile("..", new RootDirFilenameFilter()); + Assert.assertNotNull(flinkUberjar); + String flinkDistRootDir = flinkUberjar.getParentFile().getParent(); + + if (!flinkUberjar.exists()) { + Assert.fail("Unable to locate yarn-uberjar.jar"); + } + + try { + LOG.info("Starting up MiniYARN cluster"); + if (yarnCluster == null) { + yarnCluster = new MiniYARNCluster(TEST_CLUSTER_NAME, 2, 1, 1); + + yarnCluster.init(conf); + yarnCluster.start(); + } + + Map<String, String> map = new HashMap<String, String>(System.getenv()); + File flinkConfFilePath = findFile(flinkDistRootDir, new ContainsName("flink-conf.yaml")); + Assert.assertNotNull(flinkConfFilePath); + map.put("FLINK_CONF_DIR", flinkConfFilePath.getParent()); + yarnConfFile = writeYarnSiteConfigXML(conf); + map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath()); + map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos + setEnv(map); + + Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED); + } catch (Exception ex) { + ex.printStackTrace(); + LOG.error("setup failure", ex); + Assert.fail(); + } + } + + /** + * Default @BeforeClass impl. Overwrite this for passing a different configuration + */ + @BeforeClass + public static void setup() { + startYARNWithConfig(yarnConfiguration); + } + + // -------------------------- Runner -------------------------- // + + private static ByteArrayOutputStream outContent; + private static ByteArrayOutputStream errContent; + enum RunTypes { + YARN_SESSION, CLI_FRONTEND + } + + protected void runWithArgs(String[] args, String expect, RunTypes type) { + LOG.info("Running with args {}", Arrays.toString(args)); + + outContent = new ByteArrayOutputStream(); + errContent = new ByteArrayOutputStream(); + System.setOut(new PrintStream(outContent)); + System.setErr(new PrintStream(errContent)); + + + final int START_TIMEOUT_SECONDS = 60; + + Runner runner = new Runner(args, type); + runner.start(); + + boolean expectedStringSeen = false; + for(int second = 0; second < START_TIMEOUT_SECONDS; second++) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Assert.fail("Interruption not expected"); + } + // check output for correct TaskManager startup. + if(outContent.toString().contains(expect) + || errContent.toString().contains(expect) ) { + expectedStringSeen = true; + LOG.info("Found expected output in redirected streams"); + // send "stop" command to command line interface + runner.sendStop(); + // wait for the thread to stop + try { + runner.join(1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted while stopping runner", e); + } + LOG.warn("stopped"); + break; + } + // check if thread died + if(!runner.isAlive()) { + sendOutput(); + Assert.fail("Runner thread died before the test was finished. Return value = "+runner.getReturnValue()); + } + } + + sendOutput(); + Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " + + "expected string did not show up", expectedStringSeen); + LOG.info("Test was successful"); + } + + private static void sendOutput() { + System.setOut(originalStdout); + System.setErr(originalStderr); + + LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString()); + LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString()); + } + + public static class Runner extends Thread { + private final String[] args; + private int returnValue; + private RunTypes type; + private FlinkYarnSessionCli yCli; + + public Runner(String[] args, RunTypes type) { + this.args = args; + this.type = type; + } + + public int getReturnValue() { + return returnValue; + } + + @Override + public void run() { + switch(type) { + case YARN_SESSION: + yCli = new FlinkYarnSessionCli("", ""); + returnValue = yCli.run(args); + break; + case CLI_FRONTEND: + try { + CliFrontend cli = new CliFrontend(); + returnValue = cli.parseParameters(args); + } catch (Exception e) { + throw new RuntimeException(e); + } + break; + default: + throw new RuntimeException("Unknown type " + type); + } + + if(returnValue != 0) { + Assert.fail("The YARN session returned with non-null value="+returnValue); + } + } + + public void sendStop() { + if(yCli != null) { + yCli.stop(); + } + } + } + + // -------------------------- Tear down -------------------------- // + + @AfterClass + public static void tearDown() { + //shutdown YARN cluster + if (yarnCluster != null) { + LOG.info("shutdown MiniYarn cluster"); + yarnCluster.stop(); + yarnCluster = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/main/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/resources/log4j-test.properties b/flink-yarn-tests/src/main/resources/log4j-test.properties new file mode 100644 index 0000000..b4dbbe0 --- /dev/null +++ b/flink-yarn-tests/src/main/resources/log4j-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=WARN, file + +# Log all infos in the given file +log4j.appender.file=org.apache.log4j.ConsoleAppender +log4j.appender.file.append=false +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java deleted file mode 100644 index 9fd2541..0000000 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.yarn; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.File; -import java.util.Arrays; -import java.util.List; - -public class UtilsTest { - - @Test - public void testUberjarLocator() { - File dir = YarnTestBase.findFile(".", new YarnTestBase.RootDirFilenameFilter()); - Assert.assertNotNull(dir); - dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root - Assert.assertTrue(dir.exists()); - Assert.assertTrue(dir.isDirectory()); - Assert.assertTrue(dir.toString().contains("flink-dist")); - List<String> files = Arrays.asList(dir.list()); - Assert.assertTrue(files.contains("lib")); - Assert.assertTrue(files.contains("bin")); - Assert.assertTrue(files.contains("conf")); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java deleted file mode 100644 index 7da355b..0000000 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.yarn; - -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.flink.yarn.YARNSessionFIFOITCase.addTestAppender; -import static org.apache.flink.yarn.YARNSessionFIFOITCase.checkForLogString; - - -/** - * This test starts a MiniYARNCluster with a CapacityScheduler. - * Is has, by default a queue called "default". The configuration here adds another queue: "qa-team". - */ -public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { - private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class); - - @BeforeClass - public static void setup() { - yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - yarnConfiguration.set("yarn.scheduler.capacity.root.queues", "default,qa-team"); - yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40); - yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60); - startYARNWithConfig(yarnConfiguration); - } - - /** - * Test regular operation, including command line parameter parsing. - */ - @Test - public void testClientStartup() { - runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), - "-n", "1", - "-jm", "512", - "-tm", "1024", "-qu", "qa-team"}, - "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION); - - ensureNoExceptionsInLogFiles(); - } - - - /** - * Test deployment to non-existing queue. (user-reported error) - * Deployment to the queue is possible because there are no queues, so we don't check. - */ - @Test - public void testNonexistingQueue() { - addTestAppender(); - runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), - "-n", "1", - "-jm", "512", - "-tm", "1024", - "-qu", "doesntExist"}, "to unknown queue: doesntExist", RunTypes.YARN_SESSION); - checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team"); - - ensureNoExceptionsInLogFiles(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java deleted file mode 100644 index d5f301b..0000000 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ /dev/null @@ -1,306 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.yarn; - -import org.apache.flink.client.FlinkYarnSessionCli; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; -import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.log4j.AppenderSkeleton; -import org.apache.log4j.spi.LoggingEvent; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - - -/** - * This test starts a MiniYARNCluster with a FIFO scheudler. - * There are no queues for that scheduler. - */ -public class YARNSessionFIFOITCase extends YarnTestBase { - private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class); - - /* - Override init with FIFO scheduler. - */ - @BeforeClass - public static void setup() { - yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); - yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768); - yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); - startYARNWithConfig(yarnConfiguration); - } - /** - * Test regular operation, including command line parameter parsing. - */ - @Test - public void testClientStartup() { - LOG.info("Starting testClientStartup()"); - runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), - "-n", "1", - "-jm", "512", - "-tm", "1024"}, - "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION); - LOG.info("Finished testClientStartup()"); - ensureNoExceptionsInLogFiles(); - } - - - /** - * Test querying the YARN cluster. - * - * This test validates through 666*2 cores in the "cluster". - */ - @Test - public void testQueryCluster() { - LOG.info("Starting testQueryCluster()"); - runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores. - LOG.info("Finished testQueryCluster()"); - ensureNoExceptionsInLogFiles(); - } - - /** - * Test deployment to non-existing queue. (user-reported error) - * Deployment to the queue is possible because there are no queues, so we don't check. - */ - @Test - public void testNonexistingQueue() { - LOG.info("Starting testNonexistingQueue()"); - runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), - "-n", "1", - "-jm", "512", - "-tm", "1024", - "-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION); - LOG.info("Finished testNonexistingQueue()"); - ensureNoExceptionsInLogFiles(); - } - - /** - * Test requesting more resources than available. - */ - @Test - public void testMoreNodesThanAvailable() { - if(ignoreOnTravis()) { - return; - } - addTestAppender(); - LOG.info("Starting testMoreNodesThanAvailable()"); - runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), - "-n", "10", - "-jm", "512", - "-tm", "1024"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION); // the number of TMs depends on the speed of the test hardware - LOG.info("Finished testMoreNodesThanAvailable()"); - checkForLogString("This YARN session requires 10752MB of memory in the cluster. There are currently only 8192MB available."); - ensureNoExceptionsInLogFiles(); - } - - /** - * The test cluster has the following resources: - * - 2 Nodes with 4096 MB each. - * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512 - * - * We allocate: - * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb) - * 5 TaskManagers with 1585 MB - * - * user sees a total request of: 8181 MB (fits) - * system sees a total request of: 8437 (doesn't fit due to min alloc mb) - */ - @Test - public void testResourceComputation() { - if(ignoreOnTravis()) { - return; - } - addTestAppender(); - LOG.info("Starting testResourceComputation()"); - runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), - "-n", "5", - "-jm", "256", - "-tm", "1585"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION); - LOG.info("Finished testResourceComputation()"); - checkForLogString("This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available."); - } - - /** - * The test cluster has the following resources: - * - 2 Nodes with 4096 MB each. - * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512 - * - * We allocate: - * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb) - * 2 TaskManagers with 3840 MB - * - * the user sees a total request of: 7936 MB (fits) - * the system sees a request of: 8192 MB (fits) - * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit. - * - * --> check if the system properly rejects allocating this session. - */ - @Test - public void testfullAlloc() { - if(ignoreOnTravis()) { - return; - } - addTestAppender(); - LOG.info("Starting testfullAlloc()"); - runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(), - "-n", "2", - "-jm", "256", - "-tm", "3840"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION); - LOG.info("Finished testfullAlloc()"); - checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" + - "After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]"); - ensureNoExceptionsInLogFiles(); - } - - /** - * Test per-job yarn cluster - * - * This also tests the prefixed CliFrontend options for the YARN case - */ - @Test - public void perJobYarnCluster() { - LOG.info("Starting perJobYarnCluster()"); - File exampleJarLocation = YarnTestBase.findFile(".", new ContainsName("-WordCount.jar", "streaming")); // exclude streaming wordcount here. - runWithArgs(new String[] {"run", "-m", "yarn-cluster", - "-yj", flinkUberjar.getAbsolutePath(), - "-yn", "1", - "-yjm", "512", - "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "Job execution switched to status FINISHED.", RunTypes.CLI_FRONTEND); - LOG.info("Finished perJobYarnCluster()"); - ensureNoExceptionsInLogFiles(); - } - - /** - * Test the YARN Java API - */ - @Test - public void testJavaAPI() { - final int WAIT_TIME = 15; - LOG.info("Starting testJavaAPI()"); - - AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient(); - flinkYarnClient.setTaskManagerCount(1); - flinkYarnClient.setJobManagerMemory(512); - flinkYarnClient.setTaskManagerMemory(512); - flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); - String confDirPath = System.getenv("FLINK_CONF_DIR"); - flinkYarnClient.setConfigurationDirectory(confDirPath); - flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration()); - flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); - - // deploy - AbstractFlinkYarnCluster yarnCluster = null; - try { - yarnCluster = flinkYarnClient.deploy(null); - } catch (Exception e) { - System.err.println("Error while deploying YARN cluster: "+e.getMessage()); - e.printStackTrace(System.err); - Assert.fail(); - } - FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1); - for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever" - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - LOG.warn("Interrupted", e); - Thread.interrupted(); - } - FlinkYarnClusterStatus status = yarnCluster.getClusterStatus(); - if(status != null && status.equals(expectedStatus)) { - LOG.info("Cluster reached status " + status); - break; // all good, cluster started - } - if(second > WAIT_TIME) { - // we waited for 15 seconds. cluster didn't come up correctly - Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds"); - } - } - - // use the cluster - Assert.assertNotNull(yarnCluster.getJobManagerAddress()); - Assert.assertNotNull(yarnCluster.getWebInterfaceURL()); - - LOG.info("Shutting down cluster. All tests passed"); - // shutdown cluster - yarnCluster.shutdown(); - LOG.info("Finished testJavaAPI()"); - - ensureNoExceptionsInLogFiles(); - } - - public boolean ignoreOnTravis() { - if(System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true")) { - // we skip the test until we are able to start a smaller yarn clsuter - // right now, the miniyarncluster has the size of the nodemanagers fixed on 4 GBs. - LOG.warn("Skipping test on travis for now"); - return true; - } - return false; - } - - // - // --------------- Tools to test if a certain string has been logged with Log4j. ------------- - // See : http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j - // - private static TestAppender testAppender; - public static void addTestAppender() { - testAppender = new TestAppender(); - org.apache.log4j.Logger.getRootLogger().addAppender(testAppender); - } - - public static void checkForLogString(String expected) { - if(testAppender == null) { - throw new NullPointerException("Initialize it first"); - } - LoggingEvent found = null; - for(LoggingEvent event: testAppender.events) { - if(event.getMessage().toString().contains(expected)) { - found = event; - break; - } - } - if(found != null) { - LOG.info("Found expected string '"+expected+"' in log message "+found); - return; - } - Assert.fail("Unable to find expected string '"+expected+"' in log messages"); - } - - public static class TestAppender extends AppenderSkeleton { - public List<LoggingEvent> events = new ArrayList<LoggingEvent>(); - public void close() {} - public boolean requiresLayout() {return false;} - @Override - protected void append(LoggingEvent event) { - events.add(event); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java deleted file mode 100644 index 65517d3..0000000 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ /dev/null @@ -1,439 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.FlinkYarnSessionCli; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.MiniYARNCluster; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileWriter; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.PrintStream; -import java.lang.reflect.Field; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Scanner; - - -/** - * This base class allows to use the MiniYARNCluster. - * The cluster is re-used for all tests. - * - * This class is located in a different package which is build after flink-dist. This way, - * we can use the YARN uberjar of flink to start a Flink YARN session. - */ -public abstract class YarnTestBase { - private static final Logger LOG = LoggerFactory.getLogger(YarnTestBase.class); - - private final static PrintStream originalStdout = System.out; - private final static PrintStream originalStderr = System.err; - - private final static String TEST_CLUSTER_NAME = "flink-yarn-tests"; - - - // Temp directory which is deleted after the unit test. - private static TemporaryFolder tmp = new TemporaryFolder(); - - protected static MiniYARNCluster yarnCluster = null; - - protected static File flinkUberjar; - private static File yarnConfFile; - - protected static final Configuration yarnConfiguration; - static { - yarnConfiguration = new YarnConfiguration(); - yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); - yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways - yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); - yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); - yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2); - yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); - yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); - yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster. - // so we have to change the number of cores for testing. - } - - // This code is taken from: http://stackoverflow.com/a/7201825/568695 - // it changes the environment variables of this JVM. Use only for testing purposes! - @SuppressWarnings("unchecked") - private static void setEnv(Map<String, String> newenv) { - try { - Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); - Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment"); - theEnvironmentField.setAccessible(true); - Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null); - env.putAll(newenv); - Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); - theCaseInsensitiveEnvironmentField.setAccessible(true); - Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null); - cienv.putAll(newenv); - } catch (NoSuchFieldException e) { - try { - Class[] classes = Collections.class.getDeclaredClasses(); - Map<String, String> env = System.getenv(); - for (Class cl : classes) { - if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { - Field field = cl.getDeclaredField("m"); - field.setAccessible(true); - Object obj = field.get(env); - Map<String, String> map = (Map<String, String>) obj; - map.clear(); - map.putAll(newenv); - } - } - } catch (Exception e2) { - throw new RuntimeException(e2); - } - } catch (Exception e1) { - throw new RuntimeException(e1); - } - } - - /** - * Sleep a bit between the tests (we are re-using the YARN cluster for the tests) - */ - @After - public void sleep() { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Assert.fail("Should not happen"); - } - } - - @Before - public void checkClusterEmpty() throws IOException, YarnException { - YarnClient yarnClient = YarnClient.createYarnClient(); - yarnClient.init(yarnConfiguration); - yarnClient.start(); - List<ApplicationReport> apps = yarnClient.getApplications(); - for(ApplicationReport app : apps) { - if(app.getYarnApplicationState() != YarnApplicationState.FINISHED) { - Assert.fail("There is at least one application on the cluster is not finished"); - } - } - } - - /** - * Locate a file or diretory directory - */ - public static File findFile(String startAt, FilenameFilter fnf) { - File root = new File(startAt); - String[] files = root.list(); - if(files == null) { - return null; - } - for(String file : files) { - - File f = new File(startAt + File.separator + file); - if(f.isDirectory()) { - File r = findFile(f.getAbsolutePath(), fnf); - if(r != null) { - return r; - } - } else if (fnf.accept(f.getParentFile(), f.getName())) { - return f; - } - - } - return null; - } - - /** - * Filter to find root dir of the flink-yarn dist. - */ - public static class RootDirFilenameFilter implements FilenameFilter { - @Override - public boolean accept(File dir, String name) { - return name.endsWith("yarn-uberjar.jar") && dir.toString().contains("/lib"); - } - } - public static class ContainsName implements FilenameFilter { - private String name; - private String excludeInPath = null; - - public ContainsName(String name) { - this.name = name; - } - - public ContainsName(String name, String excludeInPath) { - this.name = name; - this.excludeInPath = excludeInPath; - } - - @Override - public boolean accept(File dir, String name) { - if(excludeInPath == null) { - return name.contains(this.name); - } else { - return name.contains(this.name) && !dir.toString().contains(excludeInPath); - } - } - } - - public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException { - tmp.create(); - File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml"); - - FileWriter writer = new FileWriter(yarnSiteXML); - yarnConf.writeXml(writer); - writer.flush(); - writer.close(); - return yarnSiteXML; - } - - /** - * This method checks the written TaskManager and JobManager log files - * for exceptions. - */ - public static void ensureNoExceptionsInLogFiles() { - File cwd = new File("target/"+TEST_CLUSTER_NAME); - Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to exist", cwd.exists()); - Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory()); - System.out.println("cwd = "+cwd.getAbsolutePath()); - File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - File f = new File(dir.getAbsolutePath()+ "/" + name); - // scan each file for 'Exception'. - Scanner scanner = null; - try { - scanner = new Scanner(f); - } catch (FileNotFoundException e) { - Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+f.getAbsolutePath()); - } - while (scanner.hasNextLine()) { - final String lineFromFile = scanner.nextLine(); - if(lineFromFile.contains("Exception")) { - return true; - } - } - return false; - } - }); - if(foundFile != null) { - Scanner scanner = null; - try { - scanner = new Scanner(foundFile); - } catch (FileNotFoundException e) { - Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+foundFile.getAbsolutePath()); - } - LOG.warn("Found a file with an exception. Printing contents:"); - while (scanner.hasNextLine()) { - LOG.warn("LINE: "+scanner.nextLine()); - } - Assert.fail("Found a file "+foundFile+" with an exception"); - } - } - - public static void main(String[] args) { - ensureNoExceptionsInLogFiles(); - } - - public static void startYARNWithConfig(Configuration conf) { - flinkUberjar = findFile(".", new RootDirFilenameFilter()); - Assert.assertNotNull(flinkUberjar); - String flinkDistRootDir = flinkUberjar.getParentFile().getParent(); - - if (!flinkUberjar.exists()) { - Assert.fail("Unable to locate yarn-uberjar.jar"); - } - - try { - LOG.info("Starting up MiniYARN cluster"); - if (yarnCluster == null) { - yarnCluster = new MiniYARNCluster(TEST_CLUSTER_NAME, 2, 1, 1); - - yarnCluster.init(conf); - yarnCluster.start(); - } - - Map<String, String> map = new HashMap<String, String>(System.getenv()); - File flinkConfFilePath = findFile(flinkDistRootDir, new ContainsName("flink-conf.yaml")); - Assert.assertNotNull(flinkConfFilePath); - map.put("FLINK_CONF_DIR", flinkConfFilePath.getParent()); - yarnConfFile = writeYarnSiteConfigXML(conf); - map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath()); - map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos - setEnv(map); - - Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED); - } catch (Exception ex) { - ex.printStackTrace(); - LOG.error("setup failure", ex); - Assert.fail(); - } - } - - /** - * Default @BeforeClass impl. Overwrite this for passing a different configuration - */ - @BeforeClass - public static void setup() { - startYARNWithConfig(yarnConfiguration); - } - - // -------------------------- Runner -------------------------- // - - private static ByteArrayOutputStream outContent; - private static ByteArrayOutputStream errContent; - enum RunTypes { - YARN_SESSION, CLI_FRONTEND - } - - protected void runWithArgs(String[] args, String expect, RunTypes type) { - LOG.info("Running with args {}", Arrays.toString(args)); - - outContent = new ByteArrayOutputStream(); - errContent = new ByteArrayOutputStream(); - System.setOut(new PrintStream(outContent)); - System.setErr(new PrintStream(errContent)); - - - final int START_TIMEOUT_SECONDS = 60; - - Runner runner = new Runner(args, type); - runner.start(); - - boolean expectedStringSeen = false; - for(int second = 0; second < START_TIMEOUT_SECONDS; second++) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - Assert.fail("Interruption not expected"); - } - // check output for correct TaskManager startup. - if(outContent.toString().contains(expect) - || errContent.toString().contains(expect) ) { - expectedStringSeen = true; - LOG.info("Found expected output in redirected streams"); - // send "stop" command to command line interface - runner.sendStop(); - // wait for the thread to stop - try { - runner.join(1000); - } catch (InterruptedException e) { - LOG.warn("Interrupted while stopping runner", e); - } - LOG.warn("stopped"); - break; - } - // check if thread died - if(!runner.isAlive()) { - sendOutput(); - Assert.fail("Runner thread died before the test was finished. Return value = "+runner.getReturnValue()); - } - } - - sendOutput(); - Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " + - "expected string did not show up", expectedStringSeen); - LOG.info("Test was successful"); - } - - private static void sendOutput() { - System.setOut(originalStdout); - System.setErr(originalStderr); - - LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString()); - LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString()); - } - - public static class Runner extends Thread { - private final String[] args; - private int returnValue; - private RunTypes type; - private FlinkYarnSessionCli yCli; - - public Runner(String[] args, RunTypes type) { - this.args = args; - this.type = type; - } - - public int getReturnValue() { - return returnValue; - } - - @Override - public void run() { - switch(type) { - case YARN_SESSION: - yCli = new FlinkYarnSessionCli("", ""); - returnValue = yCli.run(args); - break; - case CLI_FRONTEND: - try { - CliFrontend cli = new CliFrontend(); - returnValue = cli.parseParameters(args); - } catch (Exception e) { - throw new RuntimeException(e); - } - break; - default: - throw new RuntimeException("Unknown type " + type); - } - - if(returnValue != 0) { - Assert.fail("The YARN session returned with non-null value="+returnValue); - } - } - - public void sendStop() { - if(yCli != null) { - yCli.stop(); - } - } - } - - // -------------------------- Tear down -------------------------- // - - @AfterClass - public static void tearDown() { - //shutdown YARN cluster - if (yarnCluster != null) { - LOG.info("shutdown MiniYarn cluster"); - yarnCluster.stop(); - yarnCluster = null; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties deleted file mode 100644 index b4dbbe0..0000000 --- a/flink-yarn-tests/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,28 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=WARN, file - -# Log all infos in the given file -log4j.appender.file=org.apache.log4j.ConsoleAppender -log4j.appender.file.append=false -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml index 1569f15..805543e 100644 --- a/flink-yarn/pom.xml +++ b/flink-yarn/pom.xml @@ -51,6 +51,12 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>${shading-artifact.name}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.10</artifactId> </dependency> @@ -66,38 +72,12 @@ under the License. </dependency> <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-stream</artifactId> - <version>2.14.0</version> - </dependency> - - <!-- guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading --> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> - <scope>provided</scope> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-client</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java index 5c57292..16cb345 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -238,6 +239,15 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { @Override public void setShipFiles(List<File> shipFiles) { + File shipFile; + for(Iterator<File> it = shipFiles.iterator(); it.hasNext(); ) { + shipFile = it.next(); + // remove uberjar from ship list (by default everything in the lib/ folder is added to + // the list of files to ship, but we handle the uberjar separately. + if(shipFile.getName().startsWith("flink-dist-") && shipFile.getName().endsWith("jar")) { + it.remove(); + } + } this.shipFiles.addAll(shipFiles); } http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1d09d02..cf042ed 100644 --- a/pom.xml +++ b/pom.xml @@ -52,7 +52,7 @@ under the License. </scm> <modules> - <module>flink-shaded</module> + <module>flink-shaded-hadoop</module> <module>flink-core</module> <module>flink-java</module> <module>flink-scala</module> @@ -71,6 +71,7 @@ under the License. <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <shading-artifact.name>error</shading-artifact.name> <hadoop-one.version>1.2.1</hadoop-one.version> <hadoop-two.version>2.2.0</hadoop-two.version> <scala.version>2.10.4</scala.version> @@ -81,13 +82,15 @@ under the License. <flink.reuseForks>true</flink.reuseForks> <log4j.configuration>log4j-test.properties</log4j.configuration> <slf4j.version>1.7.7</slf4j.version> - <guava.version>17.0</guava.version> + <guava.version>18.0</guava.version> <scala.version>2.10.4</scala.version> <akka.version>2.3.7</akka.version> <scala.binary.version>2.10</scala.binary.version> <scala.macros.version>2.0.1</scala.macros.version> <kryoserialization.version>0.3.2</kryoserialization.version> <protobuf.version>2.5.0</protobuf.version> + <chill.version>0.5.2</chill.version> + <asm.version>4.0</asm.version> </properties> <dependencies> @@ -158,14 +161,16 @@ under the License. <!-- this section defines the module versions that are used if nothing else is specified. --> <dependencyManagement> + <!-- WARN: + DO NOT put guava, + protobuf, + asm, + netty + here. It will overwrite Hadoop's guava dependency (even though we handle it + separatly in the flink-shaded-hadoop module). + We can use all guava versions everywhere by adding it directly as a dependency to each project. + --> <dependencies> - - <!-- ASM is used by us, Kryo, Hadoop, ... --> - <dependency> - <groupId>org.ow2.asm</groupId> - <artifactId>asm</artifactId> - <version>4.0</version> - </dependency> <!-- Make sure we use a consistent jetty version throughout the project --> <dependency> @@ -207,6 +212,12 @@ under the License. <artifactId>commons-cli</artifactId> <version>1.2</version> </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.4</version> + </dependency> <!-- common-collections is used by us and by hadoop, so we need to define a common version --> <dependency> @@ -310,290 +321,6 @@ under the License. </exclusion> </exclusions> </dependency> - - - <!-- "Old" stable Hadoop = MapReduce v1 --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-api-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jdt</groupId> - <artifactId>core</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- Hadoop 2 Dependencies --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-api-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jdt</groupId> - <artifactId>core</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet.jsp</groupId> - <artifactId>jsp-api</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-api-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jdt</groupId> - <artifactId>core</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-api-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jdt</groupId> - <artifactId>core</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-api-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jdt</groupId> - <artifactId>core</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.inject.extensions</groupId> - <artifactId>guice-servlet</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-client</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-api-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jdt</groupId> - <artifactId>core</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-tests</artifactId> - <scope>test</scope> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <scope>test</scope> - <version>${hadoop.version}</version> - </dependency> </dependencies> </dependencyManagement> @@ -608,6 +335,7 @@ under the License. </activation> <properties> <hadoop.version>${hadoop-one.version}</hadoop.version> + <shading-artifact.name>flink-shaded-hadoop1</shading-artifact.name> </properties> </profile> <profile> @@ -620,6 +348,7 @@ under the License. </activation> <properties> <hadoop.version>${hadoop-two.version}</hadoop.version> + <shading-artifact.name>flink-shaded-hadoop2</shading-artifact.name> </properties> </profile> @@ -631,121 +360,14 @@ under the License. <!--hadoop2--><name>!hadoop.profile</name> </property> </activation> + <properties> + <shading-artifact.name>flink-shaded-include-yarn</shading-artifact.name> + </properties> <modules> <module>flink-yarn</module> <module>flink-yarn-tests</module> </modules> </profile> - <profile> - <id>hadoop-2.0.0-alpha</id> - <activation> - <property> - <name>hadoop.version</name> - <value>2.0.0-alpha</value> - </property> - </activation> - <properties> - <akka.version>2.2.1</akka.version> - <kryoserialization.version>0.3.1</kryoserialization.version> - <protobuf.version>2.4.1</protobuf.version> - </properties> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <!-- This is an additional exclusion (Netty) --> - <exclusion> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-api-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jdt</groupId> - <artifactId>core</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <!-- This is an additional exclusion (Netty) --> - <exclusion> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-api-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.eclipse.jdt</groupId> - <artifactId>core</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - </dependencyManagement> - </profile> <profile> <id>vendor-repos</id> @@ -896,29 +518,6 @@ under the License. </profile> </profiles> - <reporting> - <plugins> - <!-- execution of Unit Tests --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-report-plugin</artifactId> - <version>2.17</version> - </plugin> - - <!-- test coverage reports --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>cobertura-maven-plugin</artifactId> - <version>2.6</version> - <configuration> - <formats> - <format>html</format> - </formats> - </configuration> - </plugin> - </plugins> - </reporting> - <build> <plugins> <plugin> @@ -935,38 +534,6 @@ under the License. </configuration> </plugin> - <!-- Relocate references to Google Guava classes into a different namespace --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.3</version> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <shadedArtifactAttached>false</shadedArtifactAttached> - <createDependencyReducedPom>false</createDependencyReducedPom> - <artifactSet> - <includes> - <include>org.apache.flink:${project.artifact}</include> - </includes> - </artifactSet> - <relocations> - <relocation> - <pattern>com.google</pattern> - <shadedPattern>org.apache.flink.shaded.com.google</shadedPattern> - <excludes> - <exclude>com.google.protobuf.**</exclude> - </excludes> - </relocation> - </relocations> - </configuration> - </execution> - </executions> - </plugin> <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> @@ -1159,6 +726,54 @@ under the License. </execution> </executions> </plugin> + + <!-- We use shading in all packages for relocating some classes, such as + Guava and ASM. + By doing so, users adding Flink as a dependency won't run into conflicts. + (For example users can use whatever guava version they want, because we don't + expose our guava dependency) + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.3</version> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadeTestJar>true</shadeTestJar> + <shadedArtifactAttached>false</shadedArtifactAttached> + <createDependencyReducedPom>true</createDependencyReducedPom> + <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation> + <artifactSet> + <includes> + <include>com.google.guava:*</include> + <include>org.ow2.asm:*</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>com.google</pattern> + <shadedPattern>org.apache.flink.shaded.com.google</shadedPattern> + <excludes> + <exclude>com.google.protobuf.**</exclude> + <exclude>com.google.inject.**</exclude> + </excludes> + </relocation> + <relocation> + <pattern>org.objectweb.asm</pattern> + <shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> <!-- http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/tools/travis_mvn_watchdog.sh ---------------------------------------------------------------------- diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh index d94f977..0875a7e 100755 --- a/tools/travis_mvn_watchdog.sh +++ b/tools/travis_mvn_watchdog.sh @@ -45,7 +45,7 @@ LOG4J_PROPERTIES=${HERE}/log4j-travis.properties # Maven command to run. We set the forkCount manually, because otherwise Maven sees too many cores # on the Travis VMs. -MVN="mvn -Dflink.forkCount=2 -B $PROFILE -Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES clean install verify" +MVN="mvn -Dflink.forkCount=2 -B $PROFILE -Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES clean install" MVN_PID="${ARTIFACTS_DIR}/watchdog.mvn.pid" MVN_EXIT="${ARTIFACTS_DIR}/watchdog.mvn.exit"